Skip to content

Commit

Permalink
perf(distinct): increase perf from 60% of Rx4 to 1000% Rx4
Browse files Browse the repository at this point in the history
- removed closure
- custom tryCatching (all in _next)
- removed HashSet impl in favor of plain array
  • Loading branch information
benlesh committed Jan 28, 2016
1 parent e97fc7b commit d026c41
Showing 1 changed file with 39 additions and 54 deletions.
93 changes: 39 additions & 54 deletions src/operator/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
Expand All @@ -28,84 +26,71 @@ class DistinctOperator<T, R> implements Operator<T, R> {
}
}

class HashSet<T> {
private set: Array<T> = [];

constructor(private compare: (x: T, y: T) => boolean) {
}

private has(item: T): boolean {
for (var i = 0; i < this.set.length; i++) {
if (this.compare(this.set[i], item)) {
return true;
}
}

return false;
}

push(item: T): boolean {
if (this.has(item)) {
return false;
} else {
this.set.push(item);
return true;
}
}

flush(): void {
this.set = [];
}
}

class DistinctSubscriber<T> extends Subscriber<T> {
private hashSet: HashSet<T>;
export class DistinctSubscriber<T> extends Subscriber<T> {
private values: any[] = [];
private flushSubscription: Subscription;

constructor(destination: Subscriber<T>, compare: (x: T, y: T) => boolean, flushes: Observable<any>) {
super(destination);
if (typeof compare === 'function') {
this.compare = compare;
}
this.hashSet = new HashSet(this.compare);

if (flushes) {
this.flushSubscription = flushes.subscribe(() => this.hashSet.flush());
this.add(this.flushSubscription = flushes.subscribe(new FlushSubscriber(this)));
}
}

private compare(x: T, y: T): boolean {
return x === y;
flush() {
this.values.length = 0;
}

private disposeFlushSubscription(): void {
if (this.flushSubscription) {
this.flushSubscription.unsubscribe();
}
private compare(x: T, y: T): boolean {
return x === y;
}

protected _next(value: T): void {
let result: any = false;

result = tryCatch(this.hashSet.push.bind(this.hashSet))(value);
if (result === errorObject) {
this.destination.error(errorObject.e);
let found = false;

This comment has been minimized.

Copy link
@justinwoo

justinwoo Jan 28, 2016

Contributor

Do we need this var? We return whenever we set it to true anyway, right?

This comment has been minimized.

Copy link
@benlesh

benlesh Jan 28, 2016

Author Member

Oh, good call. It was development cruft.

This comment has been minimized.

Copy link
@benlesh

benlesh Jan 28, 2016

Author Member

It's already been merged, so we'll just need to make a note of it and remove it later.

const values = this.values;
const len = values.length;
try {
for (let i = 0; i < len; i++) {
if (this.compare(values[i], value)) {
found = true;
return;
}
}
} catch (err) {
this.destination.error(err);
return;
}

if (result) {
this.destination.next(value);
}
this.values.push(value);
this.destination.next(value);
}

protected _complete(): void {
this.disposeFlushSubscription();
super._complete();
}

unsubscribe(): void {
this.disposeFlushSubscription();
super.unsubscribe();
}

}

export class FlushSubscriber extends Subscriber<any> {
constructor(private parent: DistinctSubscriber<any>) {
super();
}

next() {
this.parent.flush();
}

complete() {
// noop
}

error(err: any) {
this.parent.error(err);
}
}

0 comments on commit d026c41

Please sign in to comment.