Skip to content

Commit

Permalink
Revert "Cancellable forEach" (#3987)
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh authored Aug 6, 2018
1 parent bb77e25 commit 1f056f5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 217 deletions.
132 changes: 31 additions & 101 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import * as Rx from 'rxjs/Rx';
import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
import * as HostReportErrorModule from '../src/internal/util/hostReportError';
import { noop } from '../src/internal/util/noop';
import { Subscription, interval, timer, Observable, Operator, config, Subscriber, Subject } from 'rxjs';

//tslint:disable-next-line
require('./helpers/test-helper');

declare const asDiagram: any, rxTestScheduler: any;

const Observable = Rx.Observable;

declare const __root__: any;

function expectFullObserver(val: any) {
Expand Down Expand Up @@ -80,7 +82,7 @@ describe('Observable', () => {
it('should allow Promise to be globally configured', (done) => {
let wasCalled = false;

config.Promise = function MyPromise(callback: any) {
Rx.config.Promise = function MyPromise(callback: any) {
wasCalled = true;
return new Promise<number>(callback);
} as any;
Expand Down Expand Up @@ -112,7 +114,7 @@ describe('Observable', () => {
});
});

it('should handle a synchronous throw from the next handler', (done: MochaDone) => {
it('should handle a synchronous throw from the next handler', () => {
const expected = new Error('I told, you Bobby Boucher, threes are the debil!');
const syncObservable = new Observable<number>((observer) => {
observer.next(1);
Expand All @@ -123,20 +125,21 @@ describe('Observable', () => {

const results: Array<number | Error> = [];

syncObservable.forEach((x) => {
return syncObservable.forEach((x) => {
results.push(x);
if (x === 3) {
throw expected;
}
})
.catch(err => results.push(err))
.then(
}).then(
() => {
expect(results).to.deep.equal([1, 2, 3, expected]);
throw new Error('should not be called');
},
(err) => {
results.push(err);
// Since the consuming code can no longer interfere with the synchronous
// producer, the remaining results are nexted.
expect(results).to.deep.equal([1, 2, 3, 4, expected]);
}
).then(
() => done(),
done,
);
});

Expand Down Expand Up @@ -168,79 +171,6 @@ describe('Observable', () => {
}
);
});

it('should be cancellable with a second Subscription argument', (done: MochaDone) => {
const source = interval(1000);
const subs = new Subscription();
let called = 0;
let completed = 0;
let error: any = undefined;

source.forEach(() => called++, subs)
.then(
() => completed++,
(err) => error = err,
);

subs.unsubscribe();

// wait a tick
Promise.resolve().then(() => {
expect(called).to.equal(0);
expect(completed).to.equal(0);
expect(error.message).to.equal('object unsubscribed');
})
.then(
() => done(),
done
);
});

it('should be cancellable with a second Subscription argument, if the subscription is already unsubbed', (done: MochaDone) => {
const source = interval(1000);
const subs = new Subscription();
let called = 0;
let completed = 0;
let error: any = undefined;

subs.unsubscribe();

source.forEach(() => called++, subs)
.then(
() => completed++,
(err) => error = err,
)
.then(() => {
expect(called).to.equal(0);
expect(completed).to.equal(0);
expect(error.message).to.equal('object unsubscribed');
})
.then(
() => done(),
done,
);
});

it('should throw an error if unsubscribed in async-await', (done: MochaDone) => {
async function test() {
const subs = new Subscription();
const results: any[] = [];
const observableComplete = timer(1000).forEach(x => results.push(x), subs);

// async, but should be the 1000ms above.
Promise.resolve().then(() => subs.unsubscribe());

let error: any = undefined;
try {
await observableComplete;
} catch (err) {
error = err;
}
expect(error.message).to.equal('object unsubscribed');
}

test().then(() => done(), done);
});
});

describe('subscribe', () => {
Expand Down Expand Up @@ -374,7 +304,7 @@ describe('Observable', () => {
const sub = source.subscribe(() => {
//noop
});
expect(sub instanceof Subscription).to.be.true;
expect(sub instanceof Rx.Subscription).to.be.true;
expect(unsubscribeCalled).to.be.false;
expect(sub.unsubscribe).to.be.a('function');

Expand Down Expand Up @@ -625,10 +555,10 @@ describe('Observable', () => {
warnCalledWith.push(args);
};

config.useDeprecatedSynchronousErrorHandling = true;
Rx.config.useDeprecatedSynchronousErrorHandling = true;
expect(warnCalledWith.length).to.equal(1);

config.useDeprecatedSynchronousErrorHandling = false;
Rx.config.useDeprecatedSynchronousErrorHandling = false;
expect(logCalledWith.length).to.equal(1);

console.log = _log;
Expand All @@ -640,7 +570,7 @@ describe('Observable', () => {
beforeEach(() => {
const _warn = console.warn;
console.warn = noop;
config.useDeprecatedSynchronousErrorHandling = true;
Rx.config.useDeprecatedSynchronousErrorHandling = true;
console.warn = _warn;
});

Expand All @@ -654,7 +584,7 @@ describe('Observable', () => {
observer.next(1);
});

const sink = Subscriber.create(() => {
const sink = Rx.Subscriber.create(() => {
throw 'error!';
});

Expand All @@ -666,7 +596,7 @@ describe('Observable', () => {
afterEach(() => {
const _log = console.log;
console.log = noop;
config.useDeprecatedSynchronousErrorHandling = false;
Rx.config.useDeprecatedSynchronousErrorHandling = false;
console.log = _log;
});
});
Expand Down Expand Up @@ -719,7 +649,7 @@ describe('Observable.create', () => {

it('should provide an observer to the function', () => {
let called = false;
const result = Observable.create((observer: Observer<any>) => {
const result = Observable.create((observer: Rx.Observer<any>) => {
called = true;
expectFullObserver(observer);
observer.complete();
Expand Down Expand Up @@ -750,13 +680,13 @@ describe('Observable.create', () => {
/** @test {Observable} */
describe('Observable.lift', () => {

class MyCustomObservable<T> extends Observable<T> {
class MyCustomObservable<T> extends Rx.Observable<T> {
static from<T>(source: any) {
const observable = new MyCustomObservable<T>();
observable.source = <Observable<T>> source;
observable.source = <Rx.Observable<T>> source;
return observable;
}
lift<R>(operator: Operator<T, R>): Observable<R> {
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
Expand Down Expand Up @@ -793,7 +723,7 @@ describe('Observable.lift', () => {
observer.next(3);
observer.complete();
})
.multicast(() => new Subject<number>())
.multicast(() => new Rx.Subject<number>())
.refCount()
.map((x) => { return 10 * x; });

Expand All @@ -818,7 +748,7 @@ describe('Observable.lift', () => {
observer.next(3);
observer.complete();
})
.multicast(() => new Subject<number>(), (shared) => shared.map((x) => { return 10 * x; }));
.multicast(() => new Rx.Subject<number>(), (shared) => shared.map((x) => { return 10 * x; }));

expect(result instanceof MyCustomObservable).to.be.true;

Expand Down Expand Up @@ -907,7 +837,7 @@ describe('Observable.lift', () => {
// The custom Subscriber
const log: Array<string> = [];

class LogSubscriber<T> extends Subscriber<T> {
class LogSubscriber<T> extends Rx.Subscriber<T> {
next(value?: T): void {
log.push('next ' + value);
if (!this.isStopped) {
Expand All @@ -917,18 +847,18 @@ describe('Observable.lift', () => {
}

// The custom Operator
class LogOperator<T, R> implements Operator<T, R> {
constructor(private childOperator: Operator<T, R>) {
class LogOperator<T, R> implements Rx.Operator<T, R> {
constructor(private childOperator: Rx.Operator<T, R>) {
}

call(subscriber: Subscriber<R>, source: any): TeardownLogic {
call(subscriber: Rx.Subscriber<R>, source: any): TeardownLogic {
return this.childOperator.call(new LogSubscriber<R>(subscriber), source);
}
}

// The custom Observable
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
const observable = new LogObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = new LogOperator(operator);
Expand Down
Loading

0 comments on commit 1f056f5

Please sign in to comment.