From f0ed13d7f5e058029b07aed10eb6f5bb002403c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kasper=20Overg=C3=A5rd=20Nielsen?= Date: Thu, 28 Apr 2022 11:38:08 +0200 Subject: [PATCH] Support async SubscriptionSet.waitForStateChange This requires support for out-of-isolate callbacks. To do this a dart specific EventLoopDispatcher and DispatchFreeUserdata is introduced. --- ffigen/config.yaml | 2 + ffigen/subscription_set.h | 1 + lib/src/app.dart | 2 +- lib/src/native/realm_bindings.dart | 86 +++++++++++++++--- lib/src/native/realm_core.dart | 29 ++++++- lib/src/realm_class.dart | 2 +- lib/src/subscription.dart | 12 +-- src/CMakeLists.txt | 5 +- src/event_loop_dispatcher.hpp | 134 +++++++++++++++++++++++++++++ src/realm_dart.h | 1 - src/subscription_set.cpp | 59 +++++++++++++ src/subscription_set.h | 45 ++++++++++ test/subscription_test.dart | 3 +- test/test.dart | 2 - 14 files changed, 353 insertions(+), 30 deletions(-) create mode 120000 ffigen/subscription_set.h create mode 100644 src/event_loop_dispatcher.hpp create mode 100644 src/subscription_set.cpp create mode 100644 src/subscription_set.h diff --git a/ffigen/config.yaml b/ffigen/config.yaml index ebdf2300d7..22243f7d8b 100644 --- a/ffigen/config.yaml +++ b/ffigen/config.yaml @@ -7,11 +7,13 @@ headers: - 'realm_dart.h' - 'realm_dart_scheduler.h' - 'realm_android_platform.h' + - 'subscription_set.h' include-directives: #generate only for these headers - 'realm.h' - 'realm_dart.h' - 'realm_dart_scheduler.h' - 'realm_android_platform.h' + - 'subscription_set.h' compiler-opts: - '-DRLM_NO_ANON_UNIONS' - '-DFFI_GEN' diff --git a/ffigen/subscription_set.h b/ffigen/subscription_set.h new file mode 120000 index 0000000000..49920850eb --- /dev/null +++ b/ffigen/subscription_set.h @@ -0,0 +1 @@ +../src/subscription_set.h \ No newline at end of file diff --git a/lib/src/app.dart b/lib/src/app.dart index 43a973cb13..4a81865372 100644 --- a/lib/src/app.dart +++ b/lib/src/app.dart @@ -143,7 +143,7 @@ class App { /// Removes the user's local credentials and attempts to invalidate their refresh token from the server. /// /// If [user] is null logs out [currentUser] if it exists. - Future logout([User? user]) async { + Future logOut([User? user]) async { user ??= currentUser; if (user == null) { return; diff --git a/lib/src/native/realm_bindings.dart b/lib/src/native/realm_bindings.dart index ab39c6e90f..e6d525464d 100644 --- a/lib/src/native/realm_bindings.dart +++ b/lib/src/native/realm_bindings.dart @@ -7028,7 +7028,7 @@ class RealmLibrary { ffi.Pointer Function( ffi.Pointer)>(); - /// Wait uptill subscripton set state is equal to the state passed as parameter. + /// Wait until subscripton set state is equal to the state passed as parameter. /// This is a blocking operation. /// @return the current subscription state int realm_sync_on_subscription_set_state_change_wait( @@ -7053,27 +7053,39 @@ class RealmLibrary { /// This is an asynchronous operation. /// @return true/false if the handler was registered correctly bool realm_sync_on_subscription_set_state_change_async( - ffi.Pointer arg0, - int arg1, - realm_sync_on_subscription_state_changed arg2, + ffi.Pointer subscription_set, + int notify_when, + realm_sync_on_subscription_state_changed callback, + ffi.Pointer userdata, + realm_free_userdata_func_t userdata_free, ) { return _realm_sync_on_subscription_set_state_change_async( - arg0, - arg1, - arg2, + subscription_set, + notify_when, + callback, + userdata, + userdata_free, ) != 0; } late final _realm_sync_on_subscription_set_state_change_asyncPtr = _lookup< ffi.NativeFunction< - ffi.Uint8 Function(ffi.Pointer, - ffi.Int32, realm_sync_on_subscription_state_changed)>>( + ffi.Uint8 Function( + ffi.Pointer, + ffi.Int32, + realm_sync_on_subscription_state_changed, + ffi.Pointer, + realm_free_userdata_func_t)>>( 'realm_sync_on_subscription_set_state_change_async'); late final _realm_sync_on_subscription_set_state_change_async = _realm_sync_on_subscription_set_state_change_asyncPtr.asFunction< - int Function(ffi.Pointer, int, - realm_sync_on_subscription_state_changed)>(); + int Function( + ffi.Pointer, + int, + realm_sync_on_subscription_state_changed, + ffi.Pointer, + realm_free_userdata_func_t)>(); /// Retrieve version for the subscription set passed as parameter /// @return subscription set version if the poiter to the subscription is valid @@ -8002,6 +8014,54 @@ class RealmLibrary { 'realm_dart_get_files_path'); late final _realm_dart_get_files_path = _realm_dart_get_files_pathPtr .asFunction Function()>(); + + /// Register a handler in order to be notified when subscription set is equal to the one passed as parameter + /// This is an asynchronous operation. + /// + /// @return true/false if the handler was registered correctly + /// + /// This is dart specific version of realm_dart_on_subscription_set_state_change_async. + /// Unlike the original method, this one uses event_loop_dispatcher to ensure the callback + /// is handled on the correct isolate thread. + bool realm_dart_sync_on_subscription_set_state_change_async( + ffi.Pointer subscription_set, + int notify_when, + realm_sync_on_subscription_state_changed callback, + ffi.Pointer userdata, + realm_free_userdata_func_t userdata_free, + ffi.Pointer scheduler, + ) { + return _realm_dart_sync_on_subscription_set_state_change_async( + subscription_set, + notify_when, + callback, + userdata, + userdata_free, + scheduler, + ) != + 0; + } + + late final _realm_dart_sync_on_subscription_set_state_change_asyncPtr = + _lookup< + ffi.NativeFunction< + ffi.Uint8 Function( + ffi.Pointer, + ffi.Int32, + realm_sync_on_subscription_state_changed, + ffi.Pointer, + realm_free_userdata_func_t, + ffi.Pointer)>>( + 'realm_dart_sync_on_subscription_set_state_change_async'); + late final _realm_dart_sync_on_subscription_set_state_change_async = + _realm_dart_sync_on_subscription_set_state_change_asyncPtr.asFunction< + int Function( + ffi.Pointer, + int, + realm_sync_on_subscription_state_changed, + ffi.Pointer, + realm_free_userdata_func_t, + ffi.Pointer)>(); } class shared_realm extends ffi.Opaque {} @@ -8924,9 +8984,7 @@ typedef realm_sync_ssl_verify_func_t = ffi.Pointer< ffi.Int32)>>; typedef realm_flx_sync_subscription_set_t = realm_flx_sync_subscription_set; typedef realm_sync_on_subscription_state_changed = ffi.Pointer< - ffi.NativeFunction< - ffi.Void Function( - ffi.Pointer, ffi.Int32)>>; + ffi.NativeFunction, ffi.Int32)>>; typedef realm_flx_sync_subscription_t = realm_flx_sync_subscription; typedef realm_flx_sync_mutable_subscription_set_t = realm_flx_sync_mutable_subscription_set; diff --git a/lib/src/native/realm_core.dart b/lib/src/native/realm_core.dart index 521bd0d395..98d8ccedb5 100644 --- a/lib/src/native/realm_core.dart +++ b/lib/src/native/realm_core.dart @@ -229,18 +229,39 @@ class _RealmCore { ))); } - static void _stateChangeCallback(Pointer subscriptionSetPtr, int state) { + static void _stateChangeCallback(Pointer userdata, int state) { + final completer = userdata.toObject>(isPersistent: true); + if (completer == null) { + return; + } + + // TODO: What about errors?! + + completer.complete(state); } void waitForSubscriptionSetStateChangeSync(SubscriptionSet subscriptions, SubscriptionSetState state) { _realmLib.realm_sync_on_subscription_set_state_change_wait(subscriptions.handle._pointer, state.index); } + Future waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen) { + final completer = Completer(); + _realmLib.realm_dart_sync_on_subscription_set_state_change_async( + subscriptions.handle._pointer, + notifyWhen.index, + Pointer.fromFunction(_stateChangeCallback), + completer.toPersistentHandle(), + _deletePersistentHandleFuncPtr, + subscriptions.realm.scheduler.handle._pointer, + ); + return completer.future; + } + MutableSubscriptionSetHandle makeSubscriptionSetMutable(SubscriptionSet subscriptions) { return MutableSubscriptionSetHandle._(_realmLib.invokeGetPointer(() => _realmLib.realm_sync_make_subscription_set_mutable(subscriptions.handle._pointer))); } - SubscriptionSetHandle subscriptionSetCommit(MutableSubscriptionSet subscriptions){ + SubscriptionSetHandle subscriptionSetCommit(MutableSubscriptionSet subscriptions) { return SubscriptionSetHandle._(_realmLib.invokeGetPointer(() => _realmLib.realm_sync_subscription_set_commit(subscriptions.mutableHandle._pointer))); } @@ -948,8 +969,10 @@ class _RealmCore { }); } + final doNotDie = Set(); AppHandle getApp(AppConfiguration configuration) { final httpTransportHandle = _createHttpTransport(configuration.httpClient); + doNotDie.add(httpTransportHandle); final appConfigHandle = _createAppConfig(configuration, httpTransportHandle); final syncClientConfigHandle = _createSyncClientConfig(configuration); final realmAppPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_app_get(appConfigHandle._pointer, syncClientConfigHandle._pointer)); @@ -1119,7 +1142,7 @@ class _RealmCore { } static void _logOutCallback(Pointer userdata, Pointer error) { - final Completer? completer = userdata.toObject(); + final Completer? completer = userdata.toObject(isPersistent: true); if (completer == null) { return; } diff --git a/lib/src/realm_class.dart b/lib/src/realm_class.dart index 8ae2853127..56e57ead87 100644 --- a/lib/src/realm_class.dart +++ b/lib/src/realm_class.dart @@ -270,7 +270,7 @@ class Realm { SubscriptionSet? _subscriptions; SubscriptionSet? get subscriptions { if (config is FlexibleSyncConfiguration) { - _subscriptions ??= SubscriptionSetInternal.create(realmCore.getSubscriptions(this)); + _subscriptions ??= SubscriptionSetInternal.create(this, realmCore.getSubscriptions(this)); // TODO: Refresh _subscriptions, if needed. } return _subscriptions; diff --git a/lib/src/subscription.dart b/lib/src/subscription.dart index 9a671025e9..617975646b 100644 --- a/lib/src/subscription.dart +++ b/lib/src/subscription.dart @@ -51,9 +51,10 @@ enum SubscriptionSetState { } abstract class SubscriptionSet with IterableMixin { + Realm _realm; SubscriptionSetHandle _handle; - SubscriptionSet._(this._handle); + SubscriptionSet._(this._realm, this._handle); Subscription? find(RealmResults query) { return Subscription._(realmCore.findSubscriptionByQuery(this, query)); @@ -63,8 +64,8 @@ abstract class SubscriptionSet with IterableMixin { return Subscription._(realmCore.findSubscriptionByName(this, name)); } - void waitForStateChange(SubscriptionSetState state) { - realmCore.waitForSubscriptionSetStateChangeSync(this, state); + Future waitForStateChange(SubscriptionSetState state) async { + return SubscriptionSetState.values[await realmCore.waitForSubscriptionSetStateChange(this, state)]; } @override @@ -82,15 +83,16 @@ abstract class SubscriptionSet with IterableMixin { } extension SubscriptionSetInternal on SubscriptionSet { + Realm get realm => _realm; SubscriptionSetHandle get handle => _handle; - static SubscriptionSet create(SubscriptionSetHandle handle) => MutableSubscriptionSet._(handle); + static SubscriptionSet create(Realm realm, SubscriptionSetHandle handle) => MutableSubscriptionSet._(realm, handle); } class MutableSubscriptionSet extends SubscriptionSet { MutableSubscriptionSetHandle? _mutableHandle; - MutableSubscriptionSet._(SubscriptionSetHandle handle) : super._(handle); + MutableSubscriptionSet._(Realm realm, SubscriptionSetHandle handle) : super._(realm, handle); @override void update(void Function(MutableSubscriptionSet mutableSubscriptions) action) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6c2c337a9c..be68832c9a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,18 +1,21 @@ set(SOURCES realm_dart.cpp realm_dart_scheduler.cpp - dart-include/dart_api_dl.c + dart-include/dart_api_dl.c + subscription_set.cpp ) set(HEADERS realm_dart.h realm_dart_scheduler.h realm-core/src/realm.h + subscription_set.h ) set (INCLUDE_DIRS dart-include dart-include/internal + ${CMAKE_BINARY_DIR}/src/realm-core/src ) if(NOT CMAKE_SYSTEM_NAME STREQUAL iOS) diff --git a/src/event_loop_dispatcher.hpp b/src/event_loop_dispatcher.hpp new file mode 100644 index 0000000000..537f61277a --- /dev/null +++ b/src/event_loop_dispatcher.hpp @@ -0,0 +1,134 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2019 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_DART_EVENT_LOOP_DISPATCHER_HPP +#define REALM_DART_EVENT_LOOP_DISPATCHER_HPP + +#include + +#include +#include +#include "dart_api_dl.h" + +namespace realm::util { + +template +class EventLoopDispatcher; + +template +class EventLoopDispatcher { + using Tuple = std::tuple::type...>; + +private: + const std::shared_ptr> m_func; + std::shared_ptr m_scheduler; // = util::Scheduler::make_default(); + +public: + EventLoopDispatcher(std::shared_ptr scheduler, util::UniqueFunction func) + : m_func(std::make_shared>(std::move(func))), m_scheduler(scheduler) + { + } + + const util::UniqueFunction& func() const + { + return *m_func; + } + + void operator()(Args... args) + { + m_scheduler->invoke( + [scheduler = m_scheduler, func = m_func, args = std::make_tuple(std::forward(args)...)]() mutable { + std::apply(*func, std::move(args)); + // Each invocation block will retain the scheduler, so the scheduler + // will not be released until all blocks are called + }); + } +}; + +namespace _impl::ForEventLoopDispatcher { +template +struct ExtractSignatureImpl { +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +template +struct ExtractSignatureImpl { + using signature = void(Args...); +}; +// Note: no && specializations since util::UniqueFunction doesn't support them, so you can't construct an +// EventLoopDispatcher from something with that anyway. + +template +using ExtractSignature = typename ExtractSignatureImpl::signature; +} // namespace _impl::ForEventLoopDispatcher + +// Deduction guide for function pointers. +template +EventLoopDispatcher(std::shared_ptr, void (*)(Args...)) -> EventLoopDispatcher; + +// Deduction guide for callable objects, such as lambdas. Only supports types with a non-overloaded, non-templated +// call operator, so no polymorphic (auto argument) lambdas. +template > +EventLoopDispatcher(std::shared_ptr, const T&) -> EventLoopDispatcher; + +struct DispatchFreeUserdata { + const std::shared_ptr m_scheduler; + realm_free_userdata_func_t m_func; + DispatchFreeUserdata(std::shared_ptr scheduler, realm_free_userdata_func_t func = nullptr) + : m_scheduler(scheduler), m_func(func) + { + } + void operator()(void* ptr) + { + if (m_func) { + m_scheduler->invoke([func = m_func, ptr]() { + func(ptr); + }); + } + } +}; + +} // namespace realm::util + +#endif diff --git a/src/realm_dart.h b/src/realm_dart.h index dc4462669d..26acb9702d 100644 --- a/src/realm_dart.h +++ b/src/realm_dart.h @@ -35,5 +35,4 @@ RLM_API void* object_to_persistent_handle(Dart_Handle handle); RLM_API Dart_Handle persistent_handle_to_object(void* handle); RLM_API void delete_persistent_handle(void* handle); - #endif // REALM_DART_H \ No newline at end of file diff --git a/src/subscription_set.cpp b/src/subscription_set.cpp new file mode 100644 index 0000000000..5107781a30 --- /dev/null +++ b/src/subscription_set.cpp @@ -0,0 +1,59 @@ +//////////////////////////////////////////////////////////////////////////////// +// +// Copyright 2022 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////////// + +#define REALM_ENABLE_SYNC 1 + +#include "subscription_set.h" + +#include + +#include +#include +#include + +#include "event_loop_dispatcher.hpp" + +namespace realm::c_api { + +using namespace realm::sync; + +RLM_API bool realm_dart_sync_on_subscription_set_state_change_async( + const realm_flx_sync_subscription_set_t* subscription_set, + realm_flx_sync_subscription_set_state_e notify_when, + realm_dart_sync_on_subscription_state_changed callback, + void* userdata, + realm_free_userdata_func_t userdata_free, + realm_scheduler_t* scheduler) noexcept +{ + return wrap_err([&]() { + auto future_state = subscription_set->get_state_change_notification(SubscriptionSet::State{notify_when}); + std::move(future_state) + .get_async([callback, scheduler, userdata = SharedUserdata(userdata, util::DispatchFreeUserdata(*scheduler, userdata_free))](const StatusWith& state) -> void { + auto cb = util::EventLoopDispatcher{*scheduler, callback}; + if (state.is_ok()) { + cb(userdata.get(), realm_flx_sync_subscription_set_state_e(static_cast(state.get_value()))); + } + else { + cb(userdata.get(), realm_flx_sync_subscription_set_state_e::RLM_SYNC_SUBSCRIPTION_ERROR); + } + }); + return true; + }); +} + +} // namespace realm::c_api diff --git a/src/subscription_set.h b/src/subscription_set.h new file mode 100644 index 0000000000..003f0d0085 --- /dev/null +++ b/src/subscription_set.h @@ -0,0 +1,45 @@ +//////////////////////////////////////////////////////////////////////////////// +// +// Copyright 2022 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_DART_SUBSCRIPTION_SET_H +#define REALM_DART_SUBSCRIPTION_SET_H + +#include "realm.h" + +typedef void (*realm_dart_sync_on_subscription_state_changed)(void* userdata, + realm_flx_sync_subscription_set_state_e state); + +/** + * Register a handler in order to be notified when subscription set is equal to the one passed as parameter + * This is an asynchronous operation. + * + * @return true/false if the handler was registered correctly + * + * This is dart specific version of realm_dart_on_subscription_set_state_change_async. + * Unlike the original method, this one uses event_loop_dispatcher to ensure the callback + * is handled on the correct isolate thread. + */ +RLM_API bool +realm_dart_sync_on_subscription_set_state_change_async(const realm_flx_sync_subscription_set_t* subscription_set, + realm_flx_sync_subscription_set_state_e notify_when, + realm_dart_sync_on_subscription_state_changed callback, + void* userdata, + realm_free_userdata_func_t userdata_free, + realm_scheduler_t* scheduler) RLM_API_NOEXCEPT; + +#endif // REALM_DART_SUBSCRIPTION_SET_H \ No newline at end of file diff --git a/test/subscription_test.dart b/test/subscription_test.dart index 84d68ce6ae..f8ee728ed4 100644 --- a/test/subscription_test.dart +++ b/test/subscription_test.dart @@ -66,9 +66,8 @@ Future main([List? args]) async { expect(realm.subscriptions, isEmpty); // expect(realm.subscriptions!.findByName(name), isNull); - realm.subscriptions!.waitForStateChange(SubscriptionSetState.complete); + await realm.subscriptions!.waitForStateChange(SubscriptionSetState.complete); realm.close(); - app.logout(user); }); } diff --git a/test/test.dart b/test/test.dart index 5e0d027719..c26b488cd0 100644 --- a/test/test.dart +++ b/test/test.dart @@ -232,7 +232,6 @@ Future baasTest( skip = skip || url == null ? "BAAS URL not present" : false; } - print('skip: $skip'); test(name, () async { final app = baasApps[appName] ?? baasApps.values.firstWhere((element) => true, orElse: () => throw RealmError("No BAAS apps")); final temporaryDir = await Directory.systemTemp.createTemp('realm_test_'); @@ -241,7 +240,6 @@ Future baasTest( baseUrl: url, baseFilePath: temporaryDir, ); - print('test'); return await testFunction(appConfig); }, skip: skip); }