Skip to content

Commit

Permalink
refactor(bufferCount): Smaller impl
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Sep 14, 2020
1 parent 5cad812 commit 143ee68
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 137 deletions.
167 changes: 69 additions & 98 deletions src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Operator } from '../Operator';
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { OperatorFunction, TeardownLogic } from '../types';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
* Buffers the source Observable values until the size hits the maximum
Expand Down Expand Up @@ -59,100 +60,70 @@ import { lift } from '../util/lift';
* @name bufferCount
*/
export function bufferCount<T>(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction<T, T[]> {
return function bufferCountOperatorFunction(source: Observable<T>) {
return lift(source, new BufferCountOperator<T>(bufferSize, startBufferEvery));
};
}

class BufferCountOperator<T> implements Operator<T, T[]> {
private subscriberClass: any;

constructor(private bufferSize: number, private startBufferEvery: number | null) {
if (!startBufferEvery || bufferSize === startBufferEvery) {
this.subscriberClass = BufferCountSubscriber;
} else {
this.subscriberClass = BufferSkipCountSubscriber;
}
}

call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferCountSubscriber<T> extends Subscriber<T> {
private buffer: T[] = [];

constructor(destination: Subscriber<T[]>, private bufferSize: number) {
super(destination);
}

protected _next(value: T): void {
const buffer = this.buffer;

buffer.push(value);

if (buffer.length == this.bufferSize) {
this.destination.next(buffer);
this.buffer = [];
}
}

protected _complete(): void {
const buffer = this.buffer;
if (buffer.length > 0) {
this.destination.next(buffer);
}
super._complete();
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferSkipCountSubscriber<T> extends Subscriber<T> {
private buffers: Array<T[]> = [];
private count: number = 0;

constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
super(destination);
}

protected _next(value: T): void {
const { bufferSize, startBufferEvery, buffers, count } = this;

this.count++;
if (count % startBufferEvery === 0) {
buffers.push([]);
}

for (let i = buffers.length; i--; ) {
const buffer = buffers[i];
buffer.push(value);
if (buffer.length === bufferSize) {
buffers.splice(i, 1);
this.destination.next(buffer);
}
}
}

protected _complete(): void {
const { buffers, destination } = this;

while (buffers.length > 0) {
let buffer = buffers.shift()!;
if (buffer.length > 0) {
destination.next(buffer);
}
}
super._complete();
}

// If no `startBufferEvery` value was supplied, then we're
// opening and closing on the bufferSize itself.
startBufferEvery = startBufferEvery ?? bufferSize;

return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T[]>, source: Observable<T>) {
const subscriber = this;
let buffers: T[][] = [];
let count = 0;

source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;

// Check to see if we need to start a buffer.
// This will start one at the first value, and then
// a new one every N after that.
if (count++ % startBufferEvery! === 0) {
buffers.push([]);
}

// Push our value into our active buffers.
for (const buffer of buffers) {
buffer.push(value);
// Check to see if we're over the bufferSize
// if we are, record it so we can emit it later.
// If we emitted it now and removed it, it would
// mutate the `buffers` array while we're looping
// over it.
if (bufferSize <= buffer.length) {
toEmit = toEmit ?? [];
toEmit.push(buffer);
}
}

if (toEmit) {
// We have found some buffers that are over the
// `bufferSize`. Emit them, and remove them from our
// buffers list.
for (const buffer of toEmit) {
const index = buffers.indexOf(buffer);
if (0 <= index) {
buffers.splice(index, 1);
}
subscriber.next(buffer);
}
}
},
undefined,
() => {
// When the source completes, emit all of our
// active buffers.
for (const buffer of buffers) {
subscriber.next(buffer);
}
subscriber.complete();
},
() => {
// Clean up our memory when we teardown
buffers = null!;
}
)
);
});
}
88 changes: 49 additions & 39 deletions src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { lift } from '../util/lift';
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { createErrorClass } from '../util/createErrorClass';
import { OperatorSubscriber } from './OperatorSubscriber';

export interface TimeoutConfig<T, R = T, M = unknown> {
/**
Expand Down Expand Up @@ -332,62 +333,71 @@ export function timeout<T, R, M>(

return lift(source, function (this: Subscriber<T | R>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription;
let timerSubscription: Subscription | null = null;
// This subscription encapsulates our subscription to the
// source for this operator. We're capturing it separately,
// because if there is a `with` observable to fail over to,
// we want to unsubscribe from our original subscription, and
// hand of the subscription to that one.
let originalSourceSubscription: Subscription;
// The subscription for our timeout timer. This changes
// every time get get a new value.
let timerSubscription: Subscription;
// A bit of state we pass to our with and error factories to
// tell what the last value we saw was.
let lastValue: T | null = null;
// A bit of state we pass to the with and error factories to
// tell how many values we have seen so far.
let seen = 0;
const startTimer = (delay: number) => {
subscription.add(
subscriber.add(
(timerSubscription = scheduler!.schedule(() => {
let withObservable: Observable<R>;
const info: TimeoutInfo<T, M> = {
meta,
lastValue,
seen,
};
try {
withObservable = from(_with!(info));
withObservable = from(
_with!({
meta,
lastValue,
seen,
})
);
} catch (err) {
subscriber.error(err);
return;
}
innerSub.unsubscribe();
subscription.add(withObservable.subscribe(subscriber));
originalSourceSubscription.unsubscribe();
withObservable.subscribe(subscriber);
}, delay))
);
};

subscription.add(
(innerSub = source.subscribe({
next: (value) => {
timerSubscription?.unsubscribe();
timerSubscription = null;
seen++;
lastValue = value;
if (each != null && each > 0) {
startTimer(each);
subscriber.add(
(originalSourceSubscription = source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
// clear the timer so we can emit and start another one.
timerSubscription.unsubscribe();
seen++;
// Emit
subscriber.next((lastValue = value));
each && each > 0 && startTimer(each);
},
undefined,
undefined,
() => {
// Be sure not to hold the last value in memory after unsubscription
// it could be quite large.
lastValue = null;
}
subscriber.next(value);
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
}))
)
))
);

let firstTimer: number;
if (first != null) {
if (typeof first === 'number') {
firstTimer = first;
} else {
firstTimer = +first - scheduler!.now();
}
} else {
firstTimer = each!;
}
startTimer(firstTimer);

return subscription;
// Intentionally terse code.
// If `first` was provided, and it's a number, then use it.
// If `first` was provided and it's not a number, it's a Date, and we get the difference between it and "now".
// If `first` was not provided at all, then our first timer will be the value from `each`.
startTimer(first != null ? (typeof first === 'number' ? first : +first - scheduler!.now()) : each!);
});
};
}
Expand Down

0 comments on commit 143ee68

Please sign in to comment.