Skip to content

Commit

Permalink
fix(throttle): now supports synchronous duration selectors
Browse files Browse the repository at this point in the history
* test: enable timeoutWith firehose test

* test: enable repeat/retry firehose tests

* test: fix skipLast firehose expectation

* test: enable throttle firehose test

* fix(throttle): support sync duration selector

* chore: fix spelling in comment

* test: add comment re: closed chain breakage

Closes #5658.
  • Loading branch information
cartant authored Sep 29, 2020
1 parent 868c02b commit 55e953e
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 16 deletions.
16 changes: 16 additions & 0 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,22 @@ describe('multicast operator', () => {
});

// TODO: fix firehose unsubscription
// AFAICT, it's not possible for multicast observables to support ASAP
// unsubscription from synchronous firehose sources. The problem is that the
// chaining of the closed 'signal' is broken by the subject. For example,
// here:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53
//
// The subject is passed to subscribe. However, in the subscribe
// implementation a SafeSubcriber is created with the subject as the
// observer:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210
//
// That breaks the chaining of closed - i.e. even if the unsubscribe is
// called on the subject, closing it, the SafeSubscriber's closed property
// won't reflect that.
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/repeat-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ describe('repeat operator', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ describe('repeatWhen operator', () => {
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown'])
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/retry-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ describe('retry operator', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
8 changes: 5 additions & 3 deletions spec/operators/skipLast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ describe('skipLast operator', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand All @@ -165,6 +164,9 @@ describe('skipLast operator', () => {
take(3),
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([0, 1, 2]);
// This expectation might seem a little strange, but the implementation of
// skipLast works by eating the number of elements that are to be skipped,
// so it will consume the number skipped in addition to the number taken.
expect(sideEffects).to.deep.equal([0, 1, 2, 3]);
});
});
3 changes: 1 addition & 2 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,7 @@ describe('throttle operator', () => {
})
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ describe('timeoutWith operator', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export function multicast<T, R>(
// Intentionally terse code: Subscribe to the result of the selector,
// then immediately connect the source through the subject, adding
// that to the resulting subscription. The act of subscribing with `this`,
// the primary destination subscriber, will automatically add the subcription
// the primary destination subscriber, will automatically add the subscription
// to the result.
selector(subject).subscribe(subscriber).add(source.subscribe(subject));
});
Expand Down
9 changes: 7 additions & 2 deletions src/internal/operators/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,20 @@ export function throttle<T>(
source.subscribe(
new OperatorSubscriber(
subscriber,
// Regarding the presence of throttled.closed in the following
// conditions, if a synchronous duration selector is specified - weird,
// but legal - an already-closed subscription will be assigned to
// throttled, so the subscription's closed property needs to be checked,
// too.
(value) => {
hasValue = true;
sendValue = value;
!throttled && (leading ? send() : throttle(value));
!(throttled && !throttled.closed) && (leading ? send() : throttle(value));
},
undefined,
() => {
isComplete = true;
!(trailing && hasValue && throttled) && subscriber.complete();
!(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete();
}
)
);
Expand Down

0 comments on commit 55e953e

Please sign in to comment.