Skip to content

Commit

Permalink
RDART-972: Prepare breaking change for progress notifications (#1563)
Browse files Browse the repository at this point in the history
* Prepare breaking change for progress notifications

* Fix async open tests
  • Loading branch information
nirinchev authored Mar 14, 2024
1 parent 689d824 commit 465e3bd
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
Realm.shutdown();
}
```
* `SyncProgress.transferredBytes` and `SyncProgress.transferableBytes` have been consolidated into `SyncProgress.progressEstimate`. The values reported previously were incorrect and did not accurately represent bytes either. The new field better conveys the uncertainty around the progress being reported. With this release, we're reporting accurate estimates for upload progress, but estimating downloads is still unreliable. A future server and SDK release will add better estimations for download progress. (Issue [#1562](https://github.com/realm/realm-dart/issues/1562))

### Enhancements
* Realm objects can now be serialized as [EJSON](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/)
Expand Down
29 changes: 18 additions & 11 deletions packages/realm_dart/lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,23 @@ class Session implements Finalizable {

/// A type containing information about the progress state at a given instant.
class SyncProgress {
/// The number of bytes that have been transferred since subscribing for progress notifications.
final int transferredBytes;

/// The total number of bytes that have to be transferred since subscribing for progress notifications.
/// The difference between that number and [transferredBytes] gives you the number of bytes not yet
/// transferred. If the difference is 0, then all changes at the instant the callback fires have been
/// successfully transferred.
final int transferableBytes;
/// A value between 0.0 and 1.0 representing the estimated transfer progress. This value is precise for
/// uploads, but will be based on historical data and certain heuristics applied by the server for downloads.
///
/// Whenever the progress reporting mode is [ProgressMode.forCurrentlyOutstandingWork], that value
/// will monotonically increase until it reaches 1.0. If the progress mode is [ProgressMode.reportIndefinitely], the
/// value may either increase or decrease as new data needs to be transferred.
final double progressEstimate;

const SyncProgress._({required this.progressEstimate});

static double _calculateProgress({required int transferred, required int transferable}) {
if (transferable == 0 || transferred > transferable) {
return 1;
}

const SyncProgress({required this.transferredBytes, required this.transferableBytes});
return transferred / transferable;
}
}

/// A type containing information about the transition of a connection state from one value to another.
Expand Down Expand Up @@ -111,7 +118,7 @@ extension SessionInternal on Session {
}

static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) =>
SyncProgress(transferredBytes: transferredBytes, transferableBytes: transferableBytes);
SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes));
}

abstract interface class ProgressNotificationsController {
Expand All @@ -136,7 +143,7 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SyncProgress(transferredBytes: transferredBytes, transferableBytes: transferableBytes));
_streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)));

if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) {
_streamController.close();
Expand Down
14 changes: 7 additions & 7 deletions packages/realm_dart/test/realm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1328,17 +1328,17 @@ void main() {
final user = await app.logIn(credentials);
final configuration = Configuration.flexibleSync(user, getSyncSchema());

int transferredBytes = -1;
double progress = -1;
final completer = Completer<void>();
var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) {
transferredBytes = syncProgress.transferredBytes;
if (syncProgress.transferredBytes == syncProgress.transferableBytes) {
progress = syncProgress.progressEstimate;
if (syncProgress.progressEstimate == 1.0) {
completer.complete();
}
});
completer.future.timeout(Duration(milliseconds: 300), onTimeout: () => throw Exception("onProgressCallback did not happen."));
expect(syncedRealm.isClosed, false);
expect(transferredBytes, greaterThan(-1));
expect(progress, greaterThan(-1));
});

baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async {
Expand All @@ -1357,16 +1357,16 @@ void main() {
final config = await _subscribeForAtlasAddedData(app);

int printCount = 0;
int transferredBytes = 0;
double progress = 0;

final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) {
printCount++;
transferredBytes = syncProgress.transferredBytes;
progress = syncProgress.progressEstimate;
});

expect(syncedRealm.isClosed, false);
expect(printCount, isNot(0));
expect(transferredBytes, greaterThan(19)); //19 bytes is the empty realm
expect(progress, 1.0);
});

baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async {
Expand Down
41 changes: 13 additions & 28 deletions packages/realm_dart/test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,11 @@ void main() {
final data = StreamProgressData();
final stream = realm.syncSession.getProgressStream(direction, mode);
data.subscription = stream.listen((event) {
expect(event.transferredBytes, greaterThanOrEqualTo(data.transferredBytes));
if (data.transferableBytes != 0) {
// We need to wait for the first event to store the total bytes we expect.
if (mode == ProgressMode.forCurrentlyOutstandingWork) {
// Transferable should not change after the first event
expect(event.transferableBytes, data.transferableBytes);
} else {
// For indefinite progress, we expect the transferable bytes to not decrease
expect(event.transferableBytes, greaterThanOrEqualTo(data.transferableBytes));
}
if (mode == ProgressMode.forCurrentlyOutstandingWork) {
expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate));
}

data.transferredBytes = event.transferredBytes;
data.transferableBytes = event.transferableBytes;
data.progressEstimate = event.progressEstimate;
data.callbacksInvoked++;
});

Expand All @@ -191,12 +182,11 @@ void main() {
await Future<void>.delayed(const Duration(milliseconds: 100));

expect(data.callbacksInvoked, greaterThan(0));
expect(data.transferableBytes, greaterThan(0));
expect(data.transferredBytes, greaterThan(0));
expect(data.progressEstimate, greaterThan(0));
if (expectDone) {
expect(data.transferredBytes, data.transferableBytes);
expect(data.progressEstimate, 1.0);
} else {
expect(data.transferredBytes, lessThanOrEqualTo(data.transferableBytes));
expect(data.progressEstimate, lessThanOrEqualTo(1.0));
}
expect(data.doneInvoked, expectDone);
}
Expand Down Expand Up @@ -245,9 +235,11 @@ void main() {

await realmA.syncSession.waitForUpload();
await validateData(uploadData);
expect(uploadData.progressEstimate, 1.0);

await realmB.syncSession.waitForDownload();
await validateData(downloadData);
expect(downloadData.progressEstimate, 1.0);

// Snapshot the current state, then add a new object. We should receive more notifications
final uploadSnapshot = StreamProgressData.snapshot(uploadData);
Expand All @@ -261,14 +253,12 @@ void main() {
await realmB.syncSession.waitForDownload();

await validateData(uploadData);
expect(uploadData.progressEstimate, 1.0);
await validateData(downloadData);
expect(downloadData.progressEstimate, 1.0);

expect(uploadData.transferredBytes, greaterThan(uploadSnapshot.transferredBytes));
expect(uploadData.transferableBytes, greaterThan(uploadSnapshot.transferableBytes));
expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked));

expect(downloadData.transferredBytes, greaterThan(downloadSnapshot.transferredBytes));
expect(downloadData.transferableBytes, greaterThan(downloadSnapshot.transferableBytes));
expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked));

await uploadData.subscription.cancel();
Expand Down Expand Up @@ -329,18 +319,13 @@ void main() {
}

class StreamProgressData {
int transferredBytes;
int transferableBytes;
double progressEstimate;
int callbacksInvoked;
bool doneInvoked;
late StreamSubscription<SyncProgress> subscription;

StreamProgressData({this.transferableBytes = 0, this.transferredBytes = 0, this.callbacksInvoked = 0, this.doneInvoked = false});
StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false});

StreamProgressData.snapshot(StreamProgressData other)
: this(
transferableBytes: other.transferableBytes,
callbacksInvoked: other.callbacksInvoked,
doneInvoked: other.doneInvoked,
transferredBytes: other.transferredBytes);
: this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate);
}

0 comments on commit 465e3bd

Please sign in to comment.