Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible regression in buffer between 7.0.0-beta.4 and 7.0.0-beta.5 #6035

Closed
OliverJAsh opened this issue Feb 19, 2021 · 8 comments
Closed
Assignees

Comments

@OliverJAsh
Copy link
Contributor

OliverJAsh commented Feb 19, 2021

Bug Report

Current Behavior
Reduced test case: https://github.com/OliverJAsh/rxjs-v7-buffer-regression

I have the following operator:

import * as RxJS from "rxjs";
import * as RxJSOperators from "rxjs/operators";

// Buffer the values and emit only after a particular time span has passed without another source
// emission.
// Like `bufferTime`, but uses debouncing instead of throttling.
// https://stackoverflow.com/questions/50515357/debounce-and-buffer-an-rxjs-subscription/50519552#50519552
export const bufferDebounce = (ms: number) => <T>(
  source: RxJS.Observable<T>
): RxJS.Observable<T[]> =>
  // The observable will have multiple subscribers, because it is used by both `buffer` and
  // `debounceTime`. We want to share the execution.
  source.pipe(
    RxJSOperators.publish((published) =>
      published.pipe(
        RxJSOperators.buffer(published.pipe(RxJSOperators.debounceTime(ms)))
      )
    )
  );

https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/42be6f30c5afa433abebd9d98f09b9da15d3efb2/bufferDebounce.ts#L1-L19

This test passes in v6 and v7.0.0-beta.4 but fails in v7.0.0-beta.5 and above:

  it(
    "works with observable that emits and completes immediately",
    marbles((m) => {
      const source$ = m.cold("(a|)");
      const ms = m.time("     |");
      const expected = "      (1|)";

      const actual$ = source$.pipe(bufferDebounce(ms));
      m.expect(actual$).toBeObservable(expected, { 1: ["a"] });
    })
  );

https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/42be6f30c5afa433abebd9d98f09b9da15d3efb2/test.js#L53-L63

The test fails with:

  ● bufferDebounce › works with observable that emits and completes immediately

    expect(received).toStrictEqual(expected) // deep equality

    - Expected  - 10
    + Received  +  0

      Array [
        Object {
          "frame": 0,
          "notification": Object {
            "error": undefined,
    -       "kind": "N",
    -       "value": Array [
    -         "a",
    -       ],
    -     },
    -   },
    -   Object {
    -     "frame": 0,
    -     "notification": Object {
    -       "error": undefined,
            "kind": "C",
            "value": undefined,
          },
        },
      ]

I believe this is due to #5654 but I'm not sure if this is intentional.

Expected behavior
The test passes like it did before v7.0.0-beta.5.

Reproduction
See above

@josepot
Copy link
Contributor

josepot commented Feb 20, 2021

I've created this code-sandbox to better understand what's happening. This is its code:

import { Subject, timer } from "rxjs";
import { buffer } from "rxjs/operators";

console.clear();

const subject = new Subject<string>();
subject.pipe(buffer(timer(0))).subscribe(
  (x) => {
    console.log("value received", x);
  },
  (e) => {
    console.log("error", e);
  },
  () => {
    console.log("done!");
  }
);

subject.next("a");
subject.complete();
// Promise.resolve().then(() => subject.complete());
/*
setTimeout(() => {
  subject.complete();
}, 0);
*/

Prior to v7.0.0-beta.5: regardless of when subject.complete gets executed (synchronously, microtask or macrotask), the subscriber always receives the buffered value and the completion.

In v7.0.0-beta.5 and after: if the source completes synchronously or inside a microtask (Promise.resolve().then), then the subscriber doesn't receive any values, just the completion. However, if the complete happens inside a macrotask (setTimeout of 0), then the subscriber receives the buffered value and the completion.

This seems to indicate that since v7.0.0-beta.5 as soon as the source completes, then the subscription to closingNotifier observable gets closed and since this hasn't had the chance to emit anything, then no values get emitted.

IMO this change in behavior that we got on v7.0.0-beta.5 makes sense and it is consistent with what happens on v6 and v7 when the source completes after having emitted a value, if in that moment the closingNotifier hasn't emitted anything yet.

@OliverJAsh
Copy link
Contributor Author

OliverJAsh commented Feb 20, 2021

How would you explain this failing test (fails in beta.5, passes in beta.4)? (Maybe I should have started with this one.)

  it(
    "works with 0 timeout",
    marbles((m) => {
      const source$ = m.cold("--(ab)--c--(de|)");
      const ms = m.time("       |");
      const expected = "      --1-----2--(3|)";

      const actual$ = source$.pipe(bufferDebounce(ms));
      m.expect(actual$).toBeObservable(expected, {
        1: ["a", "b"],
        2: ["c"],
        3: ["d", "e"],
      });
    })
  );

https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/305ea1fc05c9ef43981fe676f7d1754b609d9ee7/test.js#L22-L36

Fails with:

    - Expected  - 11
    + Received  +  0

    @@ -22,21 +22,10 @@
        },
        Object {
          "frame": 11,
          "notification": Object {
            "error": undefined,
    -       "kind": "N",
    -       "value": Array [
    -         "d",
    -         "e",
    -       ],
    -     },
    -   },
    -   Object {
    -     "frame": 11,
    -     "notification": Object {
    -       "error": undefined,
            "kind": "C",
            "value": undefined,
          },
        },
      ]

(You can clone the repo and run the tests using yarn && tsc bufferDebounce.ts && jest test.js.)

Here's the same example but reduced a bit more: https://codesandbox.io/s/buffer-regression-forked-vdu09?file=/src/index.ts

import { Subject } from "rxjs";
import { buffer, debounceTime } from "rxjs/operators";

console.clear();

const subject = new Subject<string>();
subject.pipe(buffer(subject.pipe(debounceTime(0)))).subscribe(
  (x) => {
    console.log("value received", x);
  },
  (e) => {
    console.log("error", e);
  },
  () => {
    console.log("done!");
  }
);

subject.next("a");
subject.next("b");

setTimeout(() => {
  subject.next("c");
}, 1000);

setTimeout(() => {
  subject.next("d");
  subject.next("e");
  subject.complete();
}, 2000);

v7.0.0-beta.4 and below:

image

v7.0.0-beta.5 and up:

image

It doesn't seem intuitive behaviour to me that buffer would ever lose values.

@OliverJAsh
Copy link
Contributor Author

OliverJAsh commented Feb 20, 2021

This seems to indicate that since v7.0.0-beta.5 as soon as the source completes, then the subscription to closingNotifier observable gets closed and since this hasn't had the chance to emit anything, then no values get emitted.

IMO this change in behavior that we got on v7.0.0-beta.5 makes sense and it is consistent with what happens on v6 and v7 when the source completes after having emitted a value, if in that moment the closingNotifier hasn't emitted anything yet.

I think I'm coming round to your point of view.

We can use prioritize from rxjs-etc to achieve the desired behaviour:

const subject = new Subject<string>();
const source = subject.pipe(
  prioritize((first, second) => {
    return second.pipe(buffer(first.pipe(debounceTime(0))));
  })
);

https://codesandbox.io/s/buffer-regression-forked-nh04w?file=/src/index.ts

and

import * as RxJS from "rxjs";
import * as RxJSOperators from "rxjs/operators";
import { prioritize } from "rxjs-etc/operators";

export const bufferDebounce = (ms: number) => <T>(
  source: RxJS.Observable<T>
): RxJS.Observable<T[]> =>
  source.pipe(
    prioritize((first, second) =>
      second.pipe(
        RxJSOperators.buffer(first.pipe(RxJSOperators.debounceTime(ms)))
      )
    )
  );

https://github.com/OliverJAsh/rxjs-v7-buffer-regression/blob/fix/bufferDebounce.ts

This fixes the tests which were failing before.

It might be worth listing this change as a potential breaking change in the v7 changelog, as it broke a few things in our app quite significantly (we use bufferDebounce(0) in some of our core data fetching logic).

@josepot
Copy link
Contributor

josepot commented Feb 20, 2021

FWIW I do think that having the resulting observable emit any buffered values upon the completion of the source would be a very reasonable behavior to expect from the buffer operator. So, if that's not the behavior that we want to have by default, at least there should be an optional parameter to enable that behavior.

@OliverJAsh
Copy link
Contributor Author

FWIW I do think that having the resulting observable emit any buffered values upon the completion of the source would be a very reasonable behavior to expect from the buffer operator. So, if that's not the behavior that we want to have by default, at least there should be an optional parameter to enable that behavior.

Agreed. I would be happy to send a PR but first it would be good to get input from a few other maintainers, just to make sure I do the right thing in the PR!

It might be worth listing this change as a potential breaking change in the v7 changelog, as it broke a few things in our app quite significantly (we use bufferDebounce(0) in some of our core data fetching logic).

I can send a PR for this too. Should I just add a breaking change to the existing v7.0.0-beta.5 entry in CHANGELOG.md?

For future reference, I created an even simpler test case for this breaking change. This test passes in v6 and v7.0.0-beta.4 but fails in v7.0.0-beta.5 and above:

const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(buffer(subject.pipe(debounceTime(0)))).subscribe({
  next: (value) => results.push(value),
  complete: () => results.push("complete")
});

subject.next(1);
expect(results).to.deep.equal([]);
subject.complete();
expect(results).to.deep.equal([[1], 'complete']);

https://codesandbox.io/s/buffer-regression-forked-kjdwc?file=/src/index.ts

@OliverJAsh
Copy link
Contributor Author

I'll re-open this so we don't forget to follow up on the above action points.

@OliverJAsh OliverJAsh reopened this Feb 22, 2021
@benlesh
Copy link
Member

benlesh commented Feb 22, 2021

This is actually related to an old bug here: #3990

We should always be emitting whatever is in the buffer when it closes. I suspect their will be complaints about occasional empty buffers at completion, but those are easily filtered.

@benlesh benlesh self-assigned this Feb 22, 2021
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 22, 2021
…ingNotifier` active

Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes.

BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source.

Fixes ReactiveX#3990, ReactiveX#6035
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 22, 2021
…ingNotifier` active

Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes.

BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source.

Fixes ReactiveX#3990, ReactiveX#6035
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 22, 2021
…ingNotifier` active

Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes.

BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source.

Fixes ReactiveX#3990, ReactiveX#6035
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 23, 2021
BREAKING CHANGE: Final buffered values will now always be emitted. To get the same behavior as the previous release, you can use `endWith` and `skipLast(1)`, like so: `source$.pipe(buffer(notifier$.pipe(endWith(true))), skipLast(1))`

Fixes ReactiveX#3990, ReactiveX#6035
benlesh added a commit that referenced this issue Feb 24, 2021
BREAKING CHANGE: Final buffered values will now always be emitted. To get the same behavior as the previous release, you can use `endWith` and `skipLast(1)`, like so: `source$.pipe(buffer(notifier$.pipe(endWith(true))), skipLast(1))`

Fixes #3990, #6035
@OliverJAsh
Copy link
Contributor Author

Closed by #6042 (IIUC)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants