From 465e3bd50f6d8f732847ead56ab2a5de018fb70e Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Thu, 14 Mar 2024 21:11:45 +0100 Subject: [PATCH] RDART-972: Prepare breaking change for progress notifications (#1563) * Prepare breaking change for progress notifications * Fix async open tests --- CHANGELOG.md | 1 + packages/realm_dart/lib/src/session.dart | 29 +++++++++------ packages/realm_dart/test/realm_test.dart | 14 ++++---- packages/realm_dart/test/session_test.dart | 41 +++++++--------------- 4 files changed, 39 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5766b380c..54b019f19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ Realm.shutdown(); } ``` +* `SyncProgress.transferredBytes` and `SyncProgress.transferableBytes` have been consolidated into `SyncProgress.progressEstimate`. The values reported previously were incorrect and did not accurately represent bytes either. The new field better conveys the uncertainty around the progress being reported. With this release, we're reporting accurate estimates for upload progress, but estimating downloads is still unreliable. A future server and SDK release will add better estimations for download progress. (Issue [#1562](https://github.com/realm/realm-dart/issues/1562)) ### Enhancements * Realm objects can now be serialized as [EJSON](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/) diff --git a/packages/realm_dart/lib/src/session.dart b/packages/realm_dart/lib/src/session.dart index e8da07a88..8d1ade90b 100644 --- a/packages/realm_dart/lib/src/session.dart +++ b/packages/realm_dart/lib/src/session.dart @@ -67,16 +67,23 @@ class Session implements Finalizable { /// A type containing information about the progress state at a given instant. class SyncProgress { - /// The number of bytes that have been transferred since subscribing for progress notifications. - final int transferredBytes; - - /// The total number of bytes that have to be transferred since subscribing for progress notifications. - /// The difference between that number and [transferredBytes] gives you the number of bytes not yet - /// transferred. If the difference is 0, then all changes at the instant the callback fires have been - /// successfully transferred. - final int transferableBytes; + /// A value between 0.0 and 1.0 representing the estimated transfer progress. This value is precise for + /// uploads, but will be based on historical data and certain heuristics applied by the server for downloads. + /// + /// Whenever the progress reporting mode is [ProgressMode.forCurrentlyOutstandingWork], that value + /// will monotonically increase until it reaches 1.0. If the progress mode is [ProgressMode.reportIndefinitely], the + /// value may either increase or decrease as new data needs to be transferred. + final double progressEstimate; + + const SyncProgress._({required this.progressEstimate}); + + static double _calculateProgress({required int transferred, required int transferable}) { + if (transferable == 0 || transferred > transferable) { + return 1; + } - const SyncProgress({required this.transferredBytes, required this.transferableBytes}); + return transferred / transferable; + } } /// A type containing information about the transition of a connection state from one value to another. @@ -111,7 +118,7 @@ extension SessionInternal on Session { } static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) => - SyncProgress(transferredBytes: transferredBytes, transferableBytes: transferableBytes); + SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)); } abstract interface class ProgressNotificationsController { @@ -136,7 +143,7 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon @override void onProgress(int transferredBytes, int transferableBytes) { - _streamController.add(SyncProgress(transferredBytes: transferredBytes, transferableBytes: transferableBytes)); + _streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes))); if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) { _streamController.close(); diff --git a/packages/realm_dart/test/realm_test.dart b/packages/realm_dart/test/realm_test.dart index 188b59696..c1544c87c 100644 --- a/packages/realm_dart/test/realm_test.dart +++ b/packages/realm_dart/test/realm_test.dart @@ -1328,17 +1328,17 @@ void main() { final user = await app.logIn(credentials); final configuration = Configuration.flexibleSync(user, getSyncSchema()); - int transferredBytes = -1; + double progress = -1; final completer = Completer(); var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) { - transferredBytes = syncProgress.transferredBytes; - if (syncProgress.transferredBytes == syncProgress.transferableBytes) { + progress = syncProgress.progressEstimate; + if (syncProgress.progressEstimate == 1.0) { completer.complete(); } }); completer.future.timeout(Duration(milliseconds: 300), onTimeout: () => throw Exception("onProgressCallback did not happen.")); expect(syncedRealm.isClosed, false); - expect(transferredBytes, greaterThan(-1)); + expect(progress, greaterThan(-1)); }); baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async { @@ -1357,16 +1357,16 @@ void main() { final config = await _subscribeForAtlasAddedData(app); int printCount = 0; - int transferredBytes = 0; + double progress = 0; final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) { printCount++; - transferredBytes = syncProgress.transferredBytes; + progress = syncProgress.progressEstimate; }); expect(syncedRealm.isClosed, false); expect(printCount, isNot(0)); - expect(transferredBytes, greaterThan(19)); //19 bytes is the empty realm + expect(progress, 1.0); }); baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async { diff --git a/packages/realm_dart/test/session_test.dart b/packages/realm_dart/test/session_test.dart index 3a585490d..470ca869b 100644 --- a/packages/realm_dart/test/session_test.dart +++ b/packages/realm_dart/test/session_test.dart @@ -162,20 +162,11 @@ void main() { final data = StreamProgressData(); final stream = realm.syncSession.getProgressStream(direction, mode); data.subscription = stream.listen((event) { - expect(event.transferredBytes, greaterThanOrEqualTo(data.transferredBytes)); - if (data.transferableBytes != 0) { - // We need to wait for the first event to store the total bytes we expect. - if (mode == ProgressMode.forCurrentlyOutstandingWork) { - // Transferable should not change after the first event - expect(event.transferableBytes, data.transferableBytes); - } else { - // For indefinite progress, we expect the transferable bytes to not decrease - expect(event.transferableBytes, greaterThanOrEqualTo(data.transferableBytes)); - } + if (mode == ProgressMode.forCurrentlyOutstandingWork) { + expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate)); } - data.transferredBytes = event.transferredBytes; - data.transferableBytes = event.transferableBytes; + data.progressEstimate = event.progressEstimate; data.callbacksInvoked++; }); @@ -191,12 +182,11 @@ void main() { await Future.delayed(const Duration(milliseconds: 100)); expect(data.callbacksInvoked, greaterThan(0)); - expect(data.transferableBytes, greaterThan(0)); - expect(data.transferredBytes, greaterThan(0)); + expect(data.progressEstimate, greaterThan(0)); if (expectDone) { - expect(data.transferredBytes, data.transferableBytes); + expect(data.progressEstimate, 1.0); } else { - expect(data.transferredBytes, lessThanOrEqualTo(data.transferableBytes)); + expect(data.progressEstimate, lessThanOrEqualTo(1.0)); } expect(data.doneInvoked, expectDone); } @@ -245,9 +235,11 @@ void main() { await realmA.syncSession.waitForUpload(); await validateData(uploadData); + expect(uploadData.progressEstimate, 1.0); await realmB.syncSession.waitForDownload(); await validateData(downloadData); + expect(downloadData.progressEstimate, 1.0); // Snapshot the current state, then add a new object. We should receive more notifications final uploadSnapshot = StreamProgressData.snapshot(uploadData); @@ -261,14 +253,12 @@ void main() { await realmB.syncSession.waitForDownload(); await validateData(uploadData); + expect(uploadData.progressEstimate, 1.0); await validateData(downloadData); + expect(downloadData.progressEstimate, 1.0); - expect(uploadData.transferredBytes, greaterThan(uploadSnapshot.transferredBytes)); - expect(uploadData.transferableBytes, greaterThan(uploadSnapshot.transferableBytes)); expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked)); - expect(downloadData.transferredBytes, greaterThan(downloadSnapshot.transferredBytes)); - expect(downloadData.transferableBytes, greaterThan(downloadSnapshot.transferableBytes)); expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked)); await uploadData.subscription.cancel(); @@ -329,18 +319,13 @@ void main() { } class StreamProgressData { - int transferredBytes; - int transferableBytes; + double progressEstimate; int callbacksInvoked; bool doneInvoked; late StreamSubscription subscription; - StreamProgressData({this.transferableBytes = 0, this.transferredBytes = 0, this.callbacksInvoked = 0, this.doneInvoked = false}); + StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false}); StreamProgressData.snapshot(StreamProgressData other) - : this( - transferableBytes: other.transferableBytes, - callbacksInvoked: other.callbacksInvoked, - doneInvoked: other.doneInvoked, - transferredBytes: other.transferredBytes); + : this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate); }