Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix operators that fail the synchronous firehose test #5658

Closed
13 of 17 tasks
cartant opened this issue Aug 19, 2020 · 3 comments
Closed
13 of 17 tasks

Fix operators that fail the synchronous firehose test #5658

cartant opened this issue Aug 19, 2020 · 3 comments

Comments

@cartant
Copy link
Collaborator

cartant commented Aug 19, 2020

The following operators currently fail tests that were introduced in #5652 and those failing tests are currently skipped:

The tests are marked with // TODO: fix firehose unsubscription comments.

Update: it's not going to be possible to make the multicast-based tests pass.

@kobelb
Copy link

kobelb commented Sep 18, 2020

I've noticed some oddities with the behavior of exhaustMap when there's a synchronous subscription. I'm curious whether this behavior is related to this bug, or something else entirely.

Synchronous Subscription

stackblitz

From within a synchronous subscription to the exhaustMap, the inner observable isn't marked as completed when the next callback is called:

const subject = new BehaviorSubject<number>(0);
const obs$ = subject.pipe(
  tap(() => console.log('before')),
  exhaustMap((val) => {
    return Promise.resolve(val * 100);
  }),
  tap(() => console.log('after')),
);

let i = 0;
obs$.subscribe(x => { 
  i++;
  console.log(x)
  if (i === 1) {
    subject.next(i); // the `exhaustMap` causes this to have no effect
  }
});

// output:
//
// before
// after
// 0
// before

Asynchronous Subscription

stackblitz

However, when there's an asynchronous subscription to the exhaustMap, the inner observable is marked as completed when the next callback is called:

const subject = new BehaviorSubject<number>(0);
const obs$ = subject.pipe(
  tap(() => console.log('before')),
  exhaustMap((val) => {
    return Promise.resolve(val * 100);
  }),
  switchMap((val) => {
    return Promise.resolve(val);
  }),
  tap(() => console.log('after')),
);

let i = 0;
obs$.subscribe(x => { 
  i++;
  console.log(x)
  if (i === 1) {
    subject.next(i);
  }
});

// output:
//
// before
// after
// 0
// before
// after
// 100

Cause

I believe that this is caused by the implementation of subscribeToPromise. When the promise is resolved, it first calls subscriber.next(value) immediately before calling subscriber.complete(). When the subscription is asynchronous, there's an opportunity to mark the subscriber as complete but there isn't when the subscription is synchronous. It feels like these two code-paths should behave the same, otherwise, the "implementation details" of how the observable is created is leaked to all consumers.

@josepot
Copy link
Contributor

josepot commented Sep 18, 2020

👋 @kobelb

This is the behavior that I would expect. Because exhaustMap will ignore any emissions from its source until the inner-observable completes. In the code of the first example: a value is being pushed into the subject before the inner observable has had a chance to complete. That's why I think that this should be the expected behavior.

the inner observable isn't marked as completed when the next callback is called

Keep in mind that once an observable has completed, then it can no longer emit. So, it's not possible to complete before emitting the value. First it emits and then it completes.

One way to get the behavior that you are expecting would be to observe the resulting observable of exhaustMap through the asapScheduler, like this:

const subject = new BehaviorSubject<number>(0);
const obs$ = subject.pipe(
  tap(() => console.log('before')),
  exhaustMap((val) => {
    return Promise.resolve(val * 100);
  }),
  observeOn(asapScheduler),
  tap(() => console.log('after')),
);

let i = 0;
obs$.subscribe(x => { 
  i++;
  console.log(x)
  if (i === 1) {
    subject.next(i); // the `exhaustMap` causes this to have no effect
  }
});

It feels like these two code-paths should behave the same

I disagree. In the second snipped, the inner observable of switchMap creates a micro-task that ensures that when the resulting observable emits, the inner observable of exhaustMap has completed. It would be equivalent, if the inner-observable of switchMap was synchronous:

const subject = new BehaviorSubject<number>(0);
const obs$ = subject.pipe(
  tap(() => console.log('before')),
  exhaustMap((val) => {
    return Promise.resolve(val * 100);
  }),
  switchMap((val) => {
    return of(val);
  }),
  tap(() => console.log('after')),
);

cartant added a commit to cartant/rxjs that referenced this issue Sep 24, 2020
cartant added a commit to cartant/rxjs that referenced this issue Sep 25, 2020
miginmrs added a commit to miginmrs/rxjs that referenced this issue Oct 16, 2020
- use known subscriber when possible instead of returned subscription
- remove unused unsubscribe from refCount Operator
- add teardown logic before calling subscribe when possible
- enable remaining synchronous firehose tests

fixes ReactiveX#5658
@miginmrs
Copy link

I had to make a change in ConnectableObservable by adding an internal method prepare to prepare the connect subscription before calling connect. I didn't understand why "it's not going to be possible to make the multicast-based tests pass".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants