-
Notifications
You must be signed in to change notification settings - Fork 3k
/
finalize.ts
94 lines (90 loc) · 3.01 KB
/
finalize.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
/**
* Returns an Observable that mirrors the source Observable, but will call a specified function when
* the source terminates on complete or error.
* The specified function will also be called when the subscriber explicitly unsubscribes.
*
* ## Examples
* Execute callback function when the observable completes
*
* ```ts
* import { interval } from 'rxjs';
* import { take, finalize } from 'rxjs/operators';
*
* // emit value in sequence every 1 second
* const source = interval(1000);
* const example = source.pipe(
* take(5), //take only the first 5 values
* finalize(() => console.log('Sequence complete')) // Execute when the observable completes
* )
* const subscribe = example.subscribe(val => console.log(val));
*
* // results:
* // 0
* // 1
* // 2
* // 3
* // 4
* // 'Sequence complete'
* ```
*
* Execute callback function when the subscriber explicitly unsubscribes
*
* ```ts
* import { interval, timer, noop } from 'rxjs';
* import { finalize, tap } from 'rxjs/operators';
*
* const source = interval(100).pipe(
* finalize(() => console.log('[finalize] Called')),
* tap(() => console.log('[next] Called'),
* () => console.log('[error] Not called'),
* () => console.log('[tap] Not called')),
* );
*
* const sub = source.subscribe(x => console.log(x), noop, () => console.log('[complete] Not called'));
*
* timer(150).subscribe(() => sub.unsubscribe());
*
* // results:
* // 0
* // '[finalize] Called'
* ```
*
* @param {function} callback Function to be called when source terminates.
* @return {Observable} An Observable that mirrors the source, but will call the specified function on termination.
* @method finally
* @owner Observable
*/
export function finalize<T>(callback: () => void): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(new FinallyOperator(callback));
}
class FinallyOperator<T> implements Operator<T, T> {
constructor(private callback: () => void) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
// The returned subscription will usually be the FinallySubscriber.
// However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
const finallySubscriber = new FinallySubscriber(subscriber, this.callback);
const subscription = source.subscribe(finallySubscriber);
if (subscription !== finallySubscriber) {
subscription.add(finallySubscriber);
}
return subscription;
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class FinallySubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>, callback: () => void) {
super(destination);
this.add(new Subscription(callback));
}
}