diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index 6e4adf2771..6f1eb8bdae 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -1,6 +1,6 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; -import {GroupedObservable} from '../../dist/cjs/operator/groupBy'; +import {GroupedObservable} from '../../dist/cjs/operators/groupBy'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const { asDiagram }; diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index de801caf17..9ceb6fe34f 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -1,10 +1,7 @@ -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; + import { Observable } from '../Observable'; -import { Operator } from '../Operator'; import { Subject } from '../Subject'; -import { Map } from '../util/Map'; -import { FastMap } from '../util/FastMap'; +import { groupBy as higherOrder, GroupedObservable } from '../operators/groupBy'; /* tslint:disable:max-line-length */ export function groupBy(this: Observable, keySelector: (value: T) => K): Observable>; @@ -84,210 +81,5 @@ export function groupBy(this: Observable, keySelector: (value: T) => elementSelector?: ((value: T) => R) | void, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): Observable> { - return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); -} - -export interface RefCountSubscription { - count: number; - unsubscribe: () => void; - closed: boolean; - attemptedToUnsubscribe: boolean; -} - -class GroupByOperator implements Operator> { - constructor(private keySelector: (value: T) => K, - private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable, - private subjectSelector?: () => Subject) { - } - - call(subscriber: Subscriber>, source: any): any { - return source.subscribe(new GroupBySubscriber( - subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector - )); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class GroupBySubscriber extends Subscriber implements RefCountSubscription { - private groups: Map> = null; - public attemptedToUnsubscribe: boolean = false; - public count: number = 0; - - constructor(destination: Subscriber>, - private keySelector: (value: T) => K, - private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable, - private subjectSelector?: () => Subject) { - super(destination); - } - - protected _next(value: T): void { - let key: K; - try { - key = this.keySelector(value); - } catch (err) { - this.error(err); - return; - } - - this._group(value, key); - } - - private _group(value: T, key: K) { - let groups = this.groups; - - if (!groups) { - groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); - } - - let group = groups.get(key); - - let element: R; - if (this.elementSelector) { - try { - element = this.elementSelector(value); - } catch (err) { - this.error(err); - } - } else { - element = value; - } - - if (!group) { - group = this.subjectSelector ? this.subjectSelector() : new Subject(); - groups.set(key, group); - const groupedObservable = new GroupedObservable(key, group, this); - this.destination.next(groupedObservable); - if (this.durationSelector) { - let duration: any; - try { - duration = this.durationSelector(new GroupedObservable(key, >group)); - } catch (err) { - this.error(err); - return; - } - this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); - } - } - - if (!group.closed) { - group.next(element); - } - } - - protected _error(err: any): void { - const groups = this.groups; - if (groups) { - groups.forEach((group, key) => { - group.error(err); - }); - - groups.clear(); - } - this.destination.error(err); - } - - protected _complete(): void { - const groups = this.groups; - if (groups) { - groups.forEach((group, key) => { - group.complete(); - }); - - groups.clear(); - } - this.destination.complete(); - } - - removeGroup(key: K): void { - this.groups.delete(key); - } - - unsubscribe() { - if (!this.closed) { - this.attemptedToUnsubscribe = true; - if (this.count === 0) { - super.unsubscribe(); - } - } - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class GroupDurationSubscriber extends Subscriber { - constructor(private key: K, - private group: Subject, - private parent: GroupBySubscriber) { - super(group); - } - - protected _next(value: T): void { - this.complete(); - } - - protected _unsubscribe() { - const { parent, key } = this; - this.key = this.parent = null; - if (parent) { - parent.removeGroup(key); - } - } -} - -/** - * An Observable representing values belonging to the same group represented by - * a common key. The values emitted by a GroupedObservable come from the source - * Observable. The common key is available as the field `key` on a - * GroupedObservable instance. - * - * @class GroupedObservable - */ -export class GroupedObservable extends Observable { - constructor(public key: K, - private groupSubject: Subject, - private refCountSubscription?: RefCountSubscription) { - super(); - } - - protected _subscribe(subscriber: Subscriber) { - const subscription = new Subscription(); - const {refCountSubscription, groupSubject} = this; - if (refCountSubscription && !refCountSubscription.closed) { - subscription.add(new InnerRefCountSubscription(refCountSubscription)); - } - subscription.add(groupSubject.subscribe(subscriber)); - return subscription; - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class InnerRefCountSubscription extends Subscription { - constructor(private parent: RefCountSubscription) { - super(); - parent.count++; - } - - unsubscribe() { - const parent = this.parent; - if (!parent.closed && !this.closed) { - super.unsubscribe(); - parent.count -= 1; - if (parent.count === 0 && parent.attemptedToUnsubscribe) { - parent.unsubscribe(); - } - } - } + return higherOrder(keySelector, elementSelector as any, durationSelector, subjectSelector)(this); } diff --git a/src/operators/groupBy.ts b/src/operators/groupBy.ts new file mode 100644 index 0000000000..3ed28a6966 --- /dev/null +++ b/src/operators/groupBy.ts @@ -0,0 +1,295 @@ +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; +import { Observable } from '../Observable'; +import { Operator } from '../Operator'; +import { Subject } from '../Subject'; +import { Map } from '../util/Map'; +import { FastMap } from '../util/FastMap'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function groupBy(keySelector: (value: T) => K): OperatorFunction>; +export function groupBy(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): OperatorFunction>; +/* tslint:enable:max-line-length */ + +/** + * Groups the items emitted by an Observable according to a specified criterion, + * and emits these grouped items as `GroupedObservables`, one + * {@link GroupedObservable} per group. + * + * + * + * @example Group objects by id and return as array + * Observable.of({id: 1, name: 'aze1'}, + * {id: 2, name: 'sf2'}, + * {id: 2, name: 'dg2'}, + * {id: 1, name: 'erg1'}, + * {id: 1, name: 'df1'}, + * {id: 2, name: 'sfqfb2'}, + * {id: 3, name: 'qfs3'}, + * {id: 2, name: 'qsgqsfg2'} + * ) + * .groupBy(p => p.id) + * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], [])) + * .subscribe(p => console.log(p)); + * + * // displays: + * // [ { id: 1, name: 'aze1' }, + * // { id: 1, name: 'erg1' }, + * // { id: 1, name: 'df1' } ] + * // + * // [ { id: 2, name: 'sf2' }, + * // { id: 2, name: 'dg2' }, + * // { id: 2, name: 'sfqfb2' }, + * // { id: 2, name: 'qsgqsfg2' } ] + * // + * // [ { id: 3, name: 'qfs3' } ] + * + * @example Pivot data on the id field + * Observable.of({id: 1, name: 'aze1'}, + * {id: 2, name: 'sf2'}, + * {id: 2, name: 'dg2'}, + * {id: 1, name: 'erg1'}, + * {id: 1, name: 'df1'}, + * {id: 2, name: 'sfqfb2'}, + * {id: 3, name: 'qfs1'}, + * {id: 2, name: 'qsgqsfg2'} + * ) + * .groupBy(p => p.id, p => p.name) + * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key])) + * .map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)})) + * .subscribe(p => console.log(p)); + * + * // displays: + * // { id: 1, values: [ 'aze1', 'erg1', 'df1' ] } + * // { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] } + * // { id: 3, values: [ 'qfs1' ] } + * + * @param {function(value: T): K} keySelector A function that extracts the key + * for each item. + * @param {function(value: T): R} [elementSelector] A function that extracts the + * return element for each item. + * @param {function(grouped: GroupedObservable): Observable} [durationSelector] + * A function that returns an Observable to determine how long each group should + * exist. + * @return {Observable>} An Observable that emits + * GroupedObservables, each of which corresponds to a unique key value and each + * of which emits those items from the source Observable that share that key + * value. + * @method groupBy + * @owner Observable + */ +export function groupBy(keySelector: (value: T) => K, + elementSelector?: ((value: T) => R) | void, + durationSelector?: (grouped: GroupedObservable) => Observable, + subjectSelector?: () => Subject): OperatorFunction> { + return (source: Observable) => + source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); +} + +export interface RefCountSubscription { + count: number; + unsubscribe: () => void; + closed: boolean; + attemptedToUnsubscribe: boolean; +} + +class GroupByOperator implements Operator> { + constructor(private keySelector: (value: T) => K, + private elementSelector?: ((value: T) => R) | void, + private durationSelector?: (grouped: GroupedObservable) => Observable, + private subjectSelector?: () => Subject) { + } + + call(subscriber: Subscriber>, source: any): any { + return source.subscribe(new GroupBySubscriber( + subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector + )); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class GroupBySubscriber extends Subscriber implements RefCountSubscription { + private groups: Map> = null; + public attemptedToUnsubscribe: boolean = false; + public count: number = 0; + + constructor(destination: Subscriber>, + private keySelector: (value: T) => K, + private elementSelector?: ((value: T) => R) | void, + private durationSelector?: (grouped: GroupedObservable) => Observable, + private subjectSelector?: () => Subject) { + super(destination); + } + + protected _next(value: T): void { + let key: K; + try { + key = this.keySelector(value); + } catch (err) { + this.error(err); + return; + } + + this._group(value, key); + } + + private _group(value: T, key: K) { + let groups = this.groups; + + if (!groups) { + groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); + } + + let group = groups.get(key); + + let element: R; + if (this.elementSelector) { + try { + element = this.elementSelector(value); + } catch (err) { + this.error(err); + } + } else { + element = value; + } + + if (!group) { + group = this.subjectSelector ? this.subjectSelector() : new Subject(); + groups.set(key, group); + const groupedObservable = new GroupedObservable(key, group, this); + this.destination.next(groupedObservable); + if (this.durationSelector) { + let duration: any; + try { + duration = this.durationSelector(new GroupedObservable(key, >group)); + } catch (err) { + this.error(err); + return; + } + this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); + } + } + + if (!group.closed) { + group.next(element); + } + } + + protected _error(err: any): void { + const groups = this.groups; + if (groups) { + groups.forEach((group, key) => { + group.error(err); + }); + + groups.clear(); + } + this.destination.error(err); + } + + protected _complete(): void { + const groups = this.groups; + if (groups) { + groups.forEach((group, key) => { + group.complete(); + }); + + groups.clear(); + } + this.destination.complete(); + } + + removeGroup(key: K): void { + this.groups.delete(key); + } + + unsubscribe() { + if (!this.closed) { + this.attemptedToUnsubscribe = true; + if (this.count === 0) { + super.unsubscribe(); + } + } + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class GroupDurationSubscriber extends Subscriber { + constructor(private key: K, + private group: Subject, + private parent: GroupBySubscriber) { + super(group); + } + + protected _next(value: T): void { + this.complete(); + } + + protected _unsubscribe() { + const { parent, key } = this; + this.key = this.parent = null; + if (parent) { + parent.removeGroup(key); + } + } +} + +/** + * An Observable representing values belonging to the same group represented by + * a common key. The values emitted by a GroupedObservable come from the source + * Observable. The common key is available as the field `key` on a + * GroupedObservable instance. + * + * @class GroupedObservable + */ +export class GroupedObservable extends Observable { + constructor(public key: K, + private groupSubject: Subject, + private refCountSubscription?: RefCountSubscription) { + super(); + } + + protected _subscribe(subscriber: Subscriber) { + const subscription = new Subscription(); + const {refCountSubscription, groupSubject} = this; + if (refCountSubscription && !refCountSubscription.closed) { + subscription.add(new InnerRefCountSubscription(refCountSubscription)); + } + subscription.add(groupSubject.subscribe(subscriber)); + return subscription; + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class InnerRefCountSubscription extends Subscription { + constructor(private parent: RefCountSubscription) { + super(); + parent.count++; + } + + unsubscribe() { + const parent = this.parent; + if (!parent.closed && !this.closed) { + super.unsubscribe(); + parent.count -= 1; + if (parent.count === 0 && parent.attemptedToUnsubscribe) { + parent.unsubscribe(); + } + } + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 2921296c4e..84dfabf597 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -29,6 +29,7 @@ export { finalize } from './finalize'; export { find } from './find'; export { findIndex } from './findIndex'; export { first } from './first'; +export { groupBy } from './groupBy'; export { ignoreElements } from './ignoreElements'; export { map } from './map'; export { materialize } from './materialize';