Skip to content

Commit

Permalink
RDART-973: Add support for the new progress notifications (#1546)
Browse files Browse the repository at this point in the history
* Add support for the new progress notifications

* Add a sanity check

* ..

* wip

* Clean up tests

* Clean up

* Changelog, another test

* await the timeout future

* Use realm-core 14.8.0

* downgrade core

* disable failing test

* Rework test

* Fix core version

* Tweak test so it passes
  • Loading branch information
nirinchev authored Jun 25, 2024
1 parent b109d8c commit 52e3470
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 48 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/dart-desktop-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
shell: bash

- id: runner_os_lowercase
# there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯
# there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯
# use with steps.runner_os_lowercase.outputs.os
run: echo ${{ runner.os }} | awk '{print "os=" tolower($0)}' >> $GITHUB_OUTPUT
shell: bash
Expand All @@ -59,15 +59,15 @@ jobs:
ulimit -n 10240
if: ${{ contains(runner.os, 'macos') }}

- name: Run tests ${{ runner }} ${{ runner.arch }}
- name: Run tests ${{ runner.os }} ${{ runner.arch }}
run: melos test:unit

# TODO: Publish all reports
- name: Publish Test Report
uses: dorny/[email protected]
if: success() || failure()
with:
name: Test Results Dart ${{ runner }} ${{ runner.arch }}
name: Test Results Dart ${{ runner.os }} ${{ runner.arch }}
path: test-results.json
reporter: dart-json
only-summary: true
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## vNext (TBD)

### Enhancements
* None
* The download progress estimate reported by `Session.getProgressStream` will now return meaningful estimated values, while previously it always returned 1. (Issue [#1564](https://github.com/realm/realm-dart/issues/1564))

### Fixed
* [sane_uuid](https://pub.dev/packages/sane_uuid) 1.0.0 was released, which has a few minor breaking change as compared to to 1.0.0-rc.5 that impact realm:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ void _onConnectionStateChange(Object userdata, int oldState, int newState) {
void syncProgressCallback(Object userdata, int transferred, int transferable, double estimate) {
final controller = userdata as ProgressNotificationsController;

controller.onProgress(transferred, transferable);
controller.onProgress(estimate);
}
4 changes: 2 additions & 2 deletions packages/realm_dart/lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1016,8 +1016,8 @@ class RealmAsyncOpenProgressNotificationsController implements ProgressNotificat
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes));
void onProgress(double progressEstimate) {
_streamController.add(SessionInternal.createSyncProgress(progressEstimate));
}

void _start() {
Expand Down
21 changes: 6 additions & 15 deletions packages/realm_dart/lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,7 @@ class SyncProgress {
/// value may either increase or decrease as new data needs to be transferred.
final double progressEstimate;

const SyncProgress._({required this.progressEstimate});

static double _calculateProgress({required int transferred, required int transferable}) {
if (transferable == 0 || transferred > transferable) {
return 1;
}

return transferred / transferable;
}
const SyncProgress({required this.progressEstimate});
}

/// A type containing information about the transition of a connection state from one value to another.
Expand All @@ -108,12 +100,11 @@ extension SessionInternal on Session {

void raiseError(int errorCode, bool isFatal) => handle.raiseError(errorCode, isFatal);

static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) =>
SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes));
static SyncProgress createSyncProgress(double progressEstimate) => SyncProgress(progressEstimate: progressEstimate);
}

abstract interface class ProgressNotificationsController {
void onProgress(int transferredBytes, int transferableBytes);
void onProgress(double progressEstimate);
}

/// @nodoc
Expand All @@ -133,10 +124,10 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)));
void onProgress(double progressEstimate) {
_streamController.add(SyncProgress(progressEstimate: progressEstimate));

if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) {
if (progressEstimate >= 1.0 && _mode == ProgressMode.forCurrentlyOutstandingWork) {
_streamController.close();
}
}
Expand Down
25 changes: 14 additions & 11 deletions packages/realm_dart/test/realm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1337,18 +1337,21 @@ void main() {
final user = await app.logIn(credentials);
final configuration = Configuration.flexibleSync(user, getSyncSchema());

int count = 0;
double progress = -1;

double progressEstimate = -1;
bool progressReported = false;
var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) {
count++;
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
progressReported = true;
});

await Future<void>.delayed(Duration(milliseconds: 500));

expect(syncedRealm.isClosed, false);
// Semantics of onProgressCallback changed with https://github.com/realm/realm-core/issues/7452
expect(count, 0);
expect(progress, -1);

// For FLX realms with no subscriptions, the server won't report any progress before it resolves the
// Realm.open future.
expect(progressEstimate, -1);
expect(progressReported, false);
});

baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async {
Expand All @@ -1367,16 +1370,16 @@ void main() {
final config = await _subscribeForAtlasAddedData(app);

int printCount = 0;
double progress = 0;
double progressEstimate = 0;

final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) {
printCount++;
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
});

expect(syncedRealm.isClosed, false);
expect(printCount, isNot(0));
expect(progress, 1.0);
expect(progressEstimate, 1.0);
});

baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async {
Expand Down
93 changes: 82 additions & 11 deletions packages/realm_dart/test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ void main() {
StreamProgressData subscribeToProgress(Realm realm, ProgressDirection direction, ProgressMode mode) {
final data = StreamProgressData();
final stream = realm.syncSession.getProgressStream(direction, mode);

data.subscription = stream.listen((event) {
if (mode == ProgressMode.forCurrentlyOutstandingWork) {
expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate));
Expand Down Expand Up @@ -191,26 +192,97 @@ void main() {

baasTest('SyncSession.getProgressStream forCurrentlyOutstandingWork', (configuration) async {
final differentiator = ObjectId();
final realmA = await getIntegrationRealm(differentiator: differentiator);
final realmB = await getIntegrationRealm(differentiator: differentiator);
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

for (var i = 0; i < 10; i++) {
realmA.write(() {
realmA.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(realmA, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
final downloadData = subscribeToProgress(realmB, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);
final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

await realmA.syncSession.waitForUpload();
// Subscribe immediately after the upload to ensure we get the entire upload message as progress notifications
final downloadRealm = await getIntegrationRealm(differentiator: differentiator, waitForSync: false);
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);

await validateData(uploadData, expectDone: true);
await downloadRealm.subscriptions.waitForSynchronization();

await realmB.syncSession.waitForDownload();
await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: true);

// We should not see more updates in either direction
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, downloadCallbacks);

await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});

baasTest('SyncSession.getProgressStream after reconnecting', (configuration) async {
final differentiator = ObjectId();
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

// Make sure we've caught up, then close the Realm. We'll reopen it later and verify that progress notifications
// are delivered. This is different from "SyncSession.getProgressStream forCurrentlyOutstandingWork" where we're
// testing notifications after change of query.
final user = await getIntegrationUser(appConfig: configuration);
final config = getIntegrationConfig(user);
var downloadRealm = getRealm(config);
downloadRealm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(downloadRealm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await downloadRealm.subscriptions.waitForSynchronization();
downloadRealm.close();

for (var i = 0; i < 10; i++) {
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

// Reopen the download realm and subscribe for notifications - those should still be delivered as normal.
downloadRealm = getRealm(getIntegrationConfig(user));
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.reportIndefinitely);

await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: false);

// We should not see more updates in upload direction, but should see a callback invoked for download
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, greaterThan(downloadCallbacks));

await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});
Expand Down Expand Up @@ -254,7 +326,6 @@ void main() {
expect(downloadData.progressEstimate, 1.0);

expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked));

expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked));

await uploadData.subscription.cancel();
Expand Down Expand Up @@ -319,7 +390,7 @@ class StreamProgressData {
bool doneInvoked;
late StreamSubscription<SyncProgress> subscription;

StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false});
StreamProgressData({this.progressEstimate = -1, this.callbacksInvoked = 0, this.doneInvoked = false});

StreamProgressData.snapshot(StreamProgressData other)
: this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate);
Expand Down
15 changes: 11 additions & 4 deletions packages/realm_dart/test/test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ void setupTests() {

Realm.logger.setLogLevel(LogLevel.detail);
Realm.logger.onRecord.listen((record) {
printOnFailure('${record.category} ${record.level.name}: ${record.message}');
printOnFailure('${DateTime.now().toUtc()} ${record.category} ${record.level.name}: ${record.message}');
});

if (Platform.isIOS) {
Expand Down Expand Up @@ -618,21 +618,28 @@ Future<User> getAnonymousUser(App app) {
return app.logIn(Credentials.anonymous(reuseCredentials: false));
}

FlexibleSyncConfiguration getIntegrationConfig(User user) {
return Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
}

/// Returns a synced realm after logging in a user.
///
/// A subscription for querying all [NullableTypes] objects containing
/// the `differentiator` will be added if a `differentiator` is provided.
Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig}) async {
Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig, bool waitForSync = true}) async {
app ??= App(appConfig ?? await baasHelper!.getAppConfig());
final user = await getIntegrationUser(app: app, appConfig: appConfig);

final config = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
final config = getIntegrationConfig(user);
final realm = getRealm(config);
if (differentiator != null) {
realm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(realm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await realm.subscriptions.waitForSynchronization();
if (waitForSync) {
await realm.subscriptions.waitForSynchronization();
}
}

return realm;
Expand Down

0 comments on commit 52e3470

Please sign in to comment.