Skip to content

Commit

Permalink
fix: finalize callback will be called after the source observable is …
Browse files Browse the repository at this point in the history
…torn down.

* test: add failing finalize call-order test

* fix: call finalize callback after unsubscription

* chore: remove unused import

* test: add finalize-after-teardown test

* fix: add finalize callback in operator call

Closes #5357
  • Loading branch information
cartant authored May 18, 2020
1 parent 75d4c2f commit 0d7b7c1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
35 changes: 34 additions & 1 deletion spec/operators/finalize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import { finalize, map, share } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, timer, interval, NEVER } from 'rxjs';
import { of, timer, interval, NEVER, Observable } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare const type: Function;
Expand Down Expand Up @@ -172,4 +172,37 @@ describe('finalize operator', () => {
subscription.unsubscribe();
expect(finalized).to.be.true;
});

it('should finalize sources before sinks', () => {
const finalized: string[] = [];
of(42).pipe(
finalize(() => finalized.push('source')),
finalize(() => finalized.push('sink'))
).subscribe();
expect(finalized).to.deep.equal(['source', 'sink']);
});

it('should finalize after the teardown', () => {
const order: string[] = [];
const source = new Observable<void>(() => {
return () => order.push('teardown');
});
const subscription = source.pipe(
finalize(() => order.push('finalize'))
).subscribe();
subscription.unsubscribe();
expect(order).to.deep.equal(['teardown', 'finalize']);
});

it('should finalize after the teardown with synchronous completion', () => {
const order: string[] = [];
const source = new Observable<void>(subscriber => {
subscriber.complete();
return () => order.push('teardown');
});
source.pipe(
finalize(() => order.push('finalize'))
).subscribe();
expect(order).to.deep.equal(['teardown', 'finalize']);
});
});
17 changes: 3 additions & 14 deletions src/internal/operators/finalize.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { subscribeWith } from '../util/subscribeWith';
Expand Down Expand Up @@ -69,18 +68,8 @@ class FinallyOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return subscribeWith(source, new FinallySubscriber(subscriber, this.callback));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class FinallySubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>, callback: () => void) {
super(destination);
this.add(new Subscription(callback));
const subscription = subscribeWith(source, subscriber);
subscription.add(this.callback);
return subscription;
}
}

0 comments on commit 0d7b7c1

Please sign in to comment.