Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

realm open async #1369

Merged
merged 51 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bc7650c
realm open async
blagoev Aug 4, 2023
d68effe
don't subscribe twice
blagoev Aug 4, 2023
5f19fd3
Merge branch 'main' into blagoev/realmOpenAsync
blagoev Aug 7, 2023
97db98c
filter tests on CI
blagoev Aug 8, 2023
b8c37ea
add changelog
blagoev Aug 9, 2023
3b85c77
fix native struct sizes
blagoev Aug 9, 2023
4df83a5
revert test filter
blagoev Aug 10, 2023
df11bdc
remove null check
blagoev Aug 10, 2023
6ef613c
revert null check. (fooled by the analyzer)
blagoev Aug 10, 2023
7dd5ac1
don't use broadcast
blagoev Aug 10, 2023
94a81d1
check token isCancelled inside CancellableFuture callback
blagoev Aug 10, 2023
897f971
force rebuild
blagoev Aug 10, 2023
15916eb
revert cache.
blagoev Aug 10, 2023
bc9f4f9
force release
blagoev Aug 10, 2023
6c4b676
only linux ci
blagoev Aug 11, 2023
3b71a48
filter tests
blagoev Aug 11, 2023
8246a11
only linux
blagoev Aug 11, 2023
c009104
1
blagoev Aug 11, 2023
abe4351
only linux
blagoev Aug 11, 2023
7a97898
2
blagoev Aug 11, 2023
6f23cd5
3
blagoev Aug 11, 2023
53178ad
3
blagoev Aug 11, 2023
ae389bb
4
blagoev Aug 11, 2023
1eef8b8
5
blagoev Aug 11, 2023
ab67fca
6
blagoev Aug 11, 2023
d9e46a7
1
blagoev Aug 11, 2023
249beac
7
blagoev Aug 11, 2023
3bbb86a
8
blagoev Aug 11, 2023
986ce1c
9
blagoev Aug 11, 2023
64ddc2a
10
blagoev Aug 11, 2023
9647c2e
1
blagoev Aug 11, 2023
07b3289
12
blagoev Aug 11, 2023
1c91f9a
11
blagoev Aug 11, 2023
0db6613
124
blagoev Aug 11, 2023
ba1f6fd
1
blagoev Aug 11, 2023
a6902fc
12 + custom realm-core
blagoev Aug 11, 2023
1427472
23
blagoev Aug 11, 2023
c00b503
34564
blagoev Aug 11, 2023
3738379
fixed core
blagoev Aug 11, 2023
04d98c5
345
blagoev Aug 11, 2023
b960065
334344
blagoev Aug 11, 2023
3a0360f
23
blagoev Aug 11, 2023
eaf0d45
fix open asyn callback
blagoev Aug 11, 2023
94d7fb0
3434
blagoev Aug 11, 2023
c0e9465
234234
blagoev Aug 11, 2023
51941d6
await inside
blagoev Aug 11, 2023
1f55457
remove debug statements
blagoev Aug 11, 2023
3fc0e9a
revert debug stms
blagoev Aug 11, 2023
1130bc2
revert CI
blagoev Aug 11, 2023
37c3732
revert CI
blagoev Aug 11, 2023
fee69be
revert Core
blagoev Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* Dart ^3.0.2 and Flutter ^3.10.2

### Internal
* Synced realms will use async open to prevent overloading the server with schema updates. [#1369](https://github.com/realm/realm-dart/pull/1369))
* Using Core 13.15.1

## 1.2.0 (2023-06-08)
Expand Down
34 changes: 34 additions & 0 deletions lib/src/native/realm_bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3129,6 +3129,32 @@ class RealmLibrary {
ffi.Pointer<realm_thread_safe_reference_t> Function(
ffi.Pointer<ffi.Void>)>();

void realm_dart_async_open_task_callback(
ffi.Pointer<ffi.Void> userdata,
ffi.Pointer<realm_thread_safe_reference_t> realm,
ffi.Pointer<realm_async_error_t> error,
) {
return _realm_dart_async_open_task_callback(
userdata,
realm,
error,
);
}

late final _realm_dart_async_open_task_callbackPtr = _lookup<
ffi.NativeFunction<
ffi.Void Function(
ffi.Pointer<ffi.Void>,
ffi.Pointer<realm_thread_safe_reference_t>,
ffi.Pointer<realm_async_error_t>)>>(
'realm_dart_async_open_task_callback');
late final _realm_dart_async_open_task_callback =
_realm_dart_async_open_task_callbackPtr.asFunction<
void Function(
ffi.Pointer<ffi.Void>,
ffi.Pointer<realm_thread_safe_reference_t>,
ffi.Pointer<realm_async_error_t>)>();

ffi.Pointer<realm_scheduler_t> realm_dart_create_scheduler(
int isolateId,
int port,
Expand Down Expand Up @@ -10819,6 +10845,14 @@ class RealmLibrary {
class _SymbolAddresses {
final RealmLibrary _library;
_SymbolAddresses(this._library);
ffi.Pointer<
ffi.NativeFunction<
ffi.Void Function(
ffi.Pointer<ffi.Void>,
ffi.Pointer<realm_thread_safe_reference_t>,
ffi.Pointer<realm_async_error_t>)>>
get realm_dart_async_open_task_callback =>
_library._realm_dart_async_open_task_callbackPtr;
ffi.Pointer<
ffi.NativeFunction<
ffi.Pointer<realm_scheduler_t> Function(ffi.Uint64, Dart_Port)>>
Expand Down
105 changes: 85 additions & 20 deletions lib/src/native/realm_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,7 @@ class _RealmCore {
return null;
}

final message = error.ref.message.cast<Utf8>().toRealmDartString();
Object? userError;
if (error.ref.usercode_error != nullptr) {
userError = error.ref.usercode_error.toObject(isPersistent: true);
_realmLib.realm_dart_delete_persistent_handle(error.ref.usercode_error);
}

return LastError(error.ref.error, message, userError);
return error.ref.toLastError();
}

void throwLastError([String? errorMessage]) {
Expand Down Expand Up @@ -626,8 +619,8 @@ class _RealmCore {
}

final beforeRealm = RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(beforeHandle));
final afterRealm =
RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(_realmLib.realm_from_thread_safe_reference(afterReference, scheduler.handle._pointer)));
final realmPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_from_thread_safe_reference(afterReference, scheduler.handle._pointer));
blagoev marked this conversation as resolved.
Show resolved Hide resolved
final afterRealm = RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(realmPtr));

try {
return await afterResetCallback(beforeRealm, afterRealm);
Expand Down Expand Up @@ -664,6 +657,61 @@ class _RealmCore {
return RealmHandle._(realmPtr);
}

RealmAsyncOpenTaskHandle createRealmAsyncOpenTask(FlexibleSyncConfiguration config) {
final configHandle = _createConfig(config);
final asyncOpenTaskPtr =
_realmLib.invokeGetPointer(() => _realmLib.realm_open_synchronized(configHandle._pointer), "Error opening realm at path ${config.path}");
return RealmAsyncOpenTaskHandle._(asyncOpenTaskPtr);
}

Future<RealmHandle> openRealmAsync(RealmAsyncOpenTaskHandle handle, CancellationToken? cancellationToken) {
final completer = CancellableCompleter<RealmHandle>(cancellationToken);
blagoev marked this conversation as resolved.
Show resolved Hide resolved
final callback =
Pointer.fromFunction<Void Function(Handle, Pointer<realm_thread_safe_reference> realm, Pointer<realm_async_error_t> error)>(_openRealmAsyncCallback);
final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_async_open_task_start(
handle._pointer,
_realmLib.addresses.realm_dart_async_open_task_callback,
userData.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
);

return completer.future;
}

static void _openRealmAsyncCallback(Object userData, Pointer<realm_thread_safe_reference> realmSafePtr, Pointer<realm_async_error_t> error) {
return using((Arena arena) {
final completer = userData as Completer<RealmHandle>;
blagoev marked this conversation as resolved.
Show resolved Hide resolved

if (error != nullptr) {
final err = arena<realm_error>();
_realmLib.realm_get_async_error(error, err);
completer.completeError(RealmException("Failed to open realm ${err.ref.toLastError().toString()}"));
return;
}

final realmPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_from_thread_safe_reference(realmSafePtr, scheduler.handle._pointer));
completer.complete(RealmHandle._(realmPtr));
});
}

void cancelOpenRealmAsync(RealmAsyncOpenTaskHandle handle) {
_realmLib.realm_async_open_task_cancel(handle._pointer);
}

RealmAsyncOpenTaskProgressNotificationTokenHandle realmAsyncOpenRegisterAsyncOpenProgressNotifier(
RealmAsyncOpenTaskHandle handle, RealmAsyncOpenProgressNotificationsController controller) {
final callback = Pointer.fromFunction<Void Function(Handle, Uint64, Uint64)>(_syncProgressCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(controller, callback.cast(), scheduler.handle._pointer);
final tokenPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_async_open_task_register_download_progress_notifier(
handle._pointer,
_realmLib.addresses.realm_dart_sync_progress_callback,
userdata.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
));
return RealmAsyncOpenTaskProgressNotificationTokenHandle._(tokenPtr);
}

RealmSchema readSchema(Realm realm) {
return using((Arena arena) {
return _readSchema(realm, arena);
Expand Down Expand Up @@ -1777,10 +1825,6 @@ class _RealmCore {
return using((arena) {
final handle = SyncClientConfigHandle._(_realmLib.realm_sync_client_config_new());

// TODO: Remove later
// Disable multiplexing for now due to: https://github.com/realm/realm-core/issues/6656
_realmLib.realm_sync_client_config_set_multiplex_sessions(handle._pointer, false);
// <-- end
_realmLib.realm_sync_client_config_set_base_file_path(handle._pointer, configuration.baseFilePath.path.toCharPtr(arena));
_realmLib.realm_sync_client_config_set_metadata_mode(handle._pointer, configuration.metadataPersistenceMode.index);
_realmLib.realm_sync_client_config_set_connect_timeout(handle._pointer, configuration.maxConnectionTimeout.inMilliseconds);
Expand Down Expand Up @@ -2228,20 +2272,20 @@ class _RealmCore {
RealmSyncSessionConnectionStateNotificationTokenHandle sessionRegisterProgressNotifier(
Session session, ProgressDirection direction, ProgressMode mode, SessionProgressNotificationsController controller) {
final isStreaming = mode == ProgressMode.reportIndefinitely;
final callback = Pointer.fromFunction<Void Function(Handle, Uint64, Uint64)>(_progressCallback);
final callback = Pointer.fromFunction<Void Function(Handle, Uint64, Uint64)>(_syncProgressCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(controller, callback.cast(), scheduler.handle._pointer);
final notification_token = _realmLib.realm_sync_session_register_progress_notifier(
final tokenPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_sync_session_register_progress_notifier(
session.handle._pointer,
_realmLib.addresses.realm_dart_sync_progress_callback,
direction.index,
isStreaming,
userdata.cast(),
_realmLib.addresses.realm_dart_userdata_async_free);
return RealmSyncSessionConnectionStateNotificationTokenHandle._(notification_token);
_realmLib.addresses.realm_dart_userdata_async_free));
return RealmSyncSessionConnectionStateNotificationTokenHandle._(tokenPtr);
}

static void _progressCallback(Object userdata, int transferred, int transferable) {
final controller = userdata as SessionProgressNotificationsController;
static void _syncProgressCallback(Object userdata, int transferred, int transferable) {
final controller = userdata as ProgressNotificationsController;

controller.onProgress(transferred, transferable);
}
Expand Down Expand Up @@ -2845,6 +2889,14 @@ class SubscriptionHandle extends HandleBase<realm_flx_sync_subscription> {
SubscriptionHandle._(Pointer<realm_flx_sync_subscription> pointer) : super(pointer, 184);
}

class RealmAsyncOpenTaskHandle extends HandleBase<realm_async_open_task_t> {
RealmAsyncOpenTaskHandle._(Pointer<realm_async_open_task_t> pointer) : super(pointer, 32);
}

class RealmAsyncOpenTaskProgressNotificationTokenHandle extends HandleBase<realm_async_open_task_progress_notification_token_t> {
RealmAsyncOpenTaskProgressNotificationTokenHandle._(Pointer<realm_async_open_task_progress_notification_token_t> pointer) : super(pointer, 40);
}

class SubscriptionSetHandle extends RootedHandleBase<realm_flx_sync_subscription_set> {
@override
bool get shouldRoot => true;
Expand Down Expand Up @@ -3350,3 +3402,16 @@ class SyncErrorDetails {
this.compensatingWrites,
});
}

extension on realm_error {
LastError toLastError() {
final message = this.message.cast<Utf8>().toRealmDartString();
Object? userError;
if (error == realm_errno.RLM_ERR_CALLBACK && usercode_error != nullptr) {
blagoev marked this conversation as resolved.
Show resolved Hide resolved
userError = usercode_error.toObject(isPersistent: true);
_realmLib.realm_dart_delete_persistent_handle(usercode_error);
}

return LastError(error, message, userError);
}
}
82 changes: 65 additions & 17 deletions lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -173,31 +173,47 @@ class Realm implements Finalizable {
if (cancellationToken != null && cancellationToken.isCancelled) {
throw cancellationToken.exception!;
}
final realm = Realm(config);
StreamSubscription<SyncProgress>? subscription;
try {
if (config is FlexibleSyncConfiguration) {
final session = realm.syncSession;
if (onProgressCallback != null) {
subscription = session.getProgressStream(ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork).listen(onProgressCallback);
}
await session.waitForDownload(cancellationToken);
await subscription?.cancel();
}
} catch (_) {
await subscription?.cancel();
realm.close();
rethrow;

if (config is! FlexibleSyncConfiguration) {
final realm = Realm(config);
return await CancellableFuture.value(realm, cancellationToken);
}
return await CancellableFuture.value(realm, cancellationToken);

_ensureDirectory(config);

final asyncOpenHandle = realmCore.createRealmAsyncOpenTask(config);
return await CancellableFuture.from<Realm>(() async {
if (cancellationToken != null && cancellationToken.isCancelled) {
throw cancellationToken.exception!;
}

StreamSubscription<SyncProgress>? progressSubscription;
if (onProgressCallback != null) {
final progressController = RealmAsyncOpenProgressNotificationsController._(asyncOpenHandle);
final progressStream = progressController.createStream();
progressSubscription = progressStream.listen(onProgressCallback);
blagoev marked this conversation as resolved.
Show resolved Hide resolved
}

late final RealmHandle realmHandle;
try {
realmHandle = await realmCore.openRealmAsync(asyncOpenHandle, cancellationToken);
return Realm._(config, realmHandle);
} finally {
await progressSubscription?.cancel();
}
}, cancellationToken, onCancel: () => realmCore.cancelOpenRealmAsync(asyncOpenHandle));
}

static RealmHandle _openRealm(Configuration config) {
_ensureDirectory(config);
return realmCore.openRealm(config);
}

static void _ensureDirectory(Configuration config) {
var dir = File(config.path).parent;
if (!dir.existsSync()) {
dir.createSync(recursive: true);
}
return realmCore.openRealm(config);
}

void _populateMetadata() {
Expand Down Expand Up @@ -972,3 +988,35 @@ class MigrationRealm extends DynamicRealm {
/// * syncProgress - an object of [SyncProgress] that contains `transferredBytes` and `transferableBytes`.
/// {@category Realm}
typedef ProgressCallback = void Function(SyncProgress syncProgress);

/// @nodoc
class RealmAsyncOpenProgressNotificationsController implements ProgressNotificationsController {
final RealmAsyncOpenTaskHandle _handle;
RealmAsyncOpenTaskProgressNotificationTokenHandle? _tokenHandle;
late final StreamController<SyncProgress> _streamController;

RealmAsyncOpenProgressNotificationsController._(this._handle);

Stream<SyncProgress> createStream() {
_streamController = StreamController<SyncProgress>(onListen: _start, onCancel: _stop);
return _streamController.stream;
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes));
}

void _start() {
if (_tokenHandle != null) {
throw RealmStateError("Progress subscription already started.");
}

_tokenHandle = realmCore.realmAsyncOpenRegisterAsyncOpenProgressNotifier(_handle, this);
}

void _stop() {
_tokenHandle?.release();
_tokenHandle = null;
}
}
24 changes: 15 additions & 9 deletions lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SyncProgress {
/// successfully transferred.
final int transferableBytes;

SyncProgress._(this.transferredBytes, this.transferableBytes);
const SyncProgress._(this.transferredBytes, this.transferableBytes);
}

/// A type containing information about the transition of a connection state from one value to another.
Expand Down Expand Up @@ -124,15 +124,21 @@ extension SessionInternal on Session {
void raiseError(SyncErrorCategory category, int errorCode, bool isFatal) {
realmCore.raiseError(this, category, errorCode, isFatal);
}

static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) => SyncProgress._(transferredBytes, transferableBytes);
}

abstract interface class ProgressNotificationsController {
void onProgress(int transferredBytes, int transferableBytes);
}

/// @nodoc
class SessionProgressNotificationsController {
class SessionProgressNotificationsController implements ProgressNotificationsController {
final Session _session;
final ProgressDirection _direction;
final ProgressMode _mode;

RealmSyncSessionConnectionStateNotificationTokenHandle? _token;
RealmSyncSessionConnectionStateNotificationTokenHandle? _tokenHandle;
late final StreamController<SyncProgress> _streamController;

SessionProgressNotificationsController(this._session, this._direction, this._mode);
Expand All @@ -142,6 +148,7 @@ class SessionProgressNotificationsController {
return _streamController.stream;
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SyncProgress._(transferredBytes, transferableBytes));

Expand All @@ -151,15 +158,15 @@ class SessionProgressNotificationsController {
}

void _start() {
if (_token != null) {
throw RealmStateError("Session progress subscription already started");
if (_tokenHandle != null) {
throw RealmStateError("Session progress subscription already started.");
}
_token = realmCore.sessionRegisterProgressNotifier(_session, _direction, _mode, this);
_tokenHandle = realmCore.sessionRegisterProgressNotifier(_session, _direction, _mode, this);
}

void _stop() {
_token?.release();
_token = null;
_tokenHandle?.release();
_tokenHandle = null;
}
}

Expand Down Expand Up @@ -591,7 +598,6 @@ enum SyncResolveErrorCode {
///
/// These errors will be reported via the error handlers of the affected sessions.
enum SyncWebSocketErrorCode {

/// Web socket resolution failed
websocketResolveFailed(4400),

Expand Down
8 changes: 8 additions & 0 deletions src/realm_dart_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,11 @@ RLM_API bool realm_dart_sync_after_reset_handler_callback(realm_userdata_t userd
};
return invoke_dart_and_await_result(&userCallback);
}

RLM_API void realm_dart_async_open_task_callback(realm_userdata_t userdata, realm_thread_safe_reference_t* realm, const realm_async_error_t* error)
{
auto ud = reinterpret_cast<realm_dart_userdata_async_t>(userdata);
ud->scheduler->invoke([ud, realm, error]() {
(reinterpret_cast<realm_async_open_task_completion_func_t>(ud->dart_callback))(ud->handle, realm, error);
});
}
Loading