diff --git a/spec/operators/skipUntil-spec.js b/spec/operators/skipUntil-spec.js new file mode 100644 index 0000000000..c979b35d0c --- /dev/null +++ b/spec/operators/skipUntil-spec.js @@ -0,0 +1,16 @@ +/* globals describe, it, expect, jasmine */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.skipUntil()', function () { + it('should skip values until another observable notifies', function (done) { + var expected = [5]; + + Observable.timer(0, 10) + .skipUntil(Observable.timer(45)) + .take(1) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index c9cd2cafc7..9e5d2ffa9d 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -136,6 +136,7 @@ export default class Observable { distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable; skip: (count: number) => Observable; + skipUntil: (notifier: Observable) => Observable; take: (count: number) => Observable; takeUntil: (observable: Observable) => Observable; partition: (predicate: (x: T) => boolean) => Observable[]; diff --git a/src/Rx.ts b/src/Rx.ts index cb37fed12d..c536104369 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -93,6 +93,7 @@ observableProto.startWith = startWith; import take from './operators/take'; import skip from './operators/skip'; +import skipUntil from './operators/skipUntil'; import takeUntil from './operators/takeUntil'; import filter from './operators/filter'; import distinctUntilChanged from './operators/distinctUntilChanged'; @@ -101,6 +102,7 @@ import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged'; observableProto.take = take; observableProto.skip = skip; observableProto.takeUntil = takeUntil; +observableProto.skipUntil = skipUntil; observableProto.filter = filter; observableProto.distinctUntilChanged = distinctUntilChanged; observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged; diff --git a/src/operators/skipUntil.ts b/src/operators/skipUntil.ts new file mode 100644 index 0000000000..a93c4691bc --- /dev/null +++ b/src/operators/skipUntil.ts @@ -0,0 +1,46 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +export default function skipUntil(total) { + return this.lift(new SkipUntilOperator(total)); +} + +export class SkipUntilOperator extends Operator { + constructor(private notifier: Observable) { + super(); + } + + call(observer: Observer): Observer { + return new SkipUntilSubscriber(observer, this.notifier); + } +} + +export class SkipUntilSubscriber extends Subscriber { + private notificationSubscriber: NotificationSubscriber = new NotificationSubscriber(); + + constructor(destination: Observer, private notifier: Observable) { + super(destination); + this.add(this.notifier.subscribe(this.notificationSubscriber)) + } + + _next(x) { + if (this.notificationSubscriber.hasNotified) { + this.destination.next(x); + } + } +} + +export class NotificationSubscriber extends Subscriber { + hasNotified: boolean = false; + + constructor() { + super(null); + } + + _next() { + this.hasNotified = true; + this.unsubscribe(); + } +}