diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 2cb5f9c881..cf9c31494b 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { share, retry, mergeMapTo, mergeMap, tap, repeat, take, takeUntil, takeWhile, materialize } from 'rxjs/operators'; -import { Observable, EMPTY, NEVER, of, Subject, Observer, from } from 'rxjs'; +import { share, retry, mergeMapTo, mergeMap, tap, repeat, take, takeUntil, takeWhile, materialize, map, startWith, withLatestFrom } from 'rxjs/operators'; +import { Observable, EMPTY, NEVER, of, Subject, defer } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; import sinon = require('sinon'); @@ -332,6 +332,25 @@ describe('share', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + it('should not fail on reentrant subscription', () => { + // https://github.com/ReactiveX/rxjs/issues/6144 + const source = cold('(123|)'); + const subs = ['(^!) ']; + const expected = '(136|)'; + + const deferred = defer(() => shared).pipe( + startWith(0) + ); + const shared: Observable = source.pipe( + withLatestFrom(deferred), + map(([a, b]) => String(Number(a) + Number(b))), + share() + ); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); }); describe('share(config)', () => { diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index e12fbc8692..ea0c374786 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,7 +1,6 @@ import { Subject } from '../Subject'; - import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; -import { Subscription } from '../Subscription'; +import { SafeSubscriber } from '../Subscriber'; import { from } from '../observable/from'; import { operate } from '../util/lift'; @@ -94,7 +93,7 @@ export function share(options?: ShareConfig): OperatorFunction { options = options || {}; const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; - let connection: Subscription | null = null; + let connection: SafeSubscriber | null = null; let subject: SubjectLike | null = null; let refCount = 0; let hasCompleted = false; @@ -118,9 +117,14 @@ export function share(options?: ShareConfig): OperatorFunction { subject.subscribe(subscriber); if (!connection) { - connection = from(source).subscribe({ - next: (value) => subject!.next(value), - error: (err) => { + // We need to create a subscriber here - rather than pass an observer and + // assign the returned subscription to connection - because it's possible + // for reentrant subscriptions to the shared observable to occur and in + // those situations we want connection to be already-assigned so that we + // don't create another connection to the source. + connection = new SafeSubscriber({ + next: (value: T) => subject!.next(value), + error: (err: any) => { hasErrored = true; // We need to capture the subject before // we reset (if we need to reset). @@ -141,6 +145,7 @@ export function share(options?: ShareConfig): OperatorFunction { dest.complete(); }, }); + from(source).subscribe(connection); } // This is also added to `subscriber`, technically.