Skip to content

Commit

Permalink
fix(shareReplay): properly uses lift (#2924)
Browse files Browse the repository at this point in the history
resolves #2921
  • Loading branch information
benlesh authored Oct 9, 2017
1 parent 4dc73e4 commit 3d9cf87
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
32 changes: 32 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,36 @@ describe('Observable.prototype.shareReplay', () => {
rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('should not break lift() composability', (done: MochaDone) => {
class MyCustomObservable<T> extends Rx.Observable<T> {
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

const result = new MyCustomObservable((observer: Rx.Observer<number>) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).shareReplay();

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [1, 2, 3];

result
.subscribe((n: any) => {
expect(expected.length).to.be.greaterThan(0);
expect(n).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
done();
});
});
});
14 changes: 10 additions & 4 deletions src/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@ import { ReplaySubject } from '../ReplaySubject';
import { IScheduler } from '../Scheduler';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction } from '../interfaces';
import { Subscriber } from '../Subscriber';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
}

export function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler) {
let subject: ReplaySubject<T>;
let refCount = 0;
let subscription: Subscription;
let hasError = false;
let isComplete = false;

return (source: Observable<T>) => new Observable<T>(observer => {
return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
refCount++;
if (!subject || hasError) {
hasError = false;
Expand All @@ -32,7 +38,7 @@ export function shareReplay<T>(bufferSize?: number, windowTime?: number, schedul
});
}

const innerSub = subject.subscribe(observer);
const innerSub = subject.subscribe(this);

return () => {
refCount--;
Expand All @@ -41,5 +47,5 @@ export function shareReplay<T>(bufferSize?: number, windowTime?: number, schedul
subscription.unsubscribe();
}
};
});
};
};
};

0 comments on commit 3d9cf87

Please sign in to comment.