Skip to content

Commit

Permalink
ref: performance fix for v large queues.
Browse files Browse the repository at this point in the history
Fixes #1
  • Loading branch information
zenmumbler committed Jul 17, 2018
1 parent ee1c581 commit 1128f0d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 8 deletions.
13 changes: 9 additions & 4 deletions packages/streams/src/queue-mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* https://github.com/stardazed/sd-streams
*/

import { Queue, QueueImpl } from "./queue";
import { isFiniteNonNegativeNumber } from "./shared-internals";

export const queue_ = Symbol("queue_");
Expand All @@ -16,12 +17,12 @@ export interface QueueElement<V> {
}

export interface QueueContainer<V> {
[queue_]: QueueElement<V>[];
[queue_]: Queue<QueueElement<V>>;
[queueTotalSize_]: number;
}

export interface ByteQueueContainer {
[queue_]: { buffer: ArrayBufferLike, byteOffset: number, byteLength: number }[];
[queue_]: Queue<{ buffer: ArrayBufferLike, byteOffset: number, byteLength: number }>;
[queueTotalSize_]: number;
}

Expand All @@ -46,10 +47,14 @@ export function enqueueValueWithSize<V>(container: QueueContainer<V>, value: V,
export function peekQueueValue<V>(container: QueueContainer<V>) {
// Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
// Assert: container.[[queue]] is not empty.
return container[queue_][0].value;
return container[queue_].front()!.value;
}

export function resetQueue<V>(container: ByteQueueContainer | QueueContainer<V>) {
container[queue_] = [];
container[queue_] = new QueueImpl<any>();
// const q = [] as any;
// q.front = function() { return this[0]; };
// container[queue_] = q;

container[queueTotalSize_] = 0;
}
62 changes: 62 additions & 0 deletions packages/streams/src/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* streams/queue - simple queue type with chunked array backing
* Part of Stardazed
* (c) 2018 by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/

const CHUNK_SIZE = 16384;

export interface Queue<T> {
push(t: T): void;
shift(): T | undefined;
front(): T | undefined;
readonly length: number;
}

export class QueueImpl<T> implements Queue<T> {
private readonly chunks_: T[][];
private readChunk_: T[];
private writeChunk_: T[];
private length_: number;

constructor() {
this.chunks_ = [[]];
this.readChunk_ = this.writeChunk_ = this.chunks_[0];
this.length_ = 0;
}

push(t: T): void {
this.writeChunk_.push(t);
this.length_ += 1;
if (this.writeChunk_.length === CHUNK_SIZE) {
this.writeChunk_ = [];
this.chunks_.push(this.writeChunk_);
}
}

front(): T | undefined {
if (this.length_ === 0) {
return undefined;
}
return this.readChunk_[0];
}

shift(): T | undefined {
if (this.length_ === 0) {
return undefined;
}
const t = this.readChunk_.shift();

this.length_ -= 1;
if (this.readChunk_.length === 0 && this.readChunk_ !== this.writeChunk_) {
this.chunks_.shift();
this.readChunk_ = this.chunks_[0];
}
return t;
}

get length() {
return this.length_;
}
}
3 changes: 2 additions & 1 deletion packages/streams/src/readable-byte-stream-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as rs from "./readable-internals";
import * as q from "./queue-mixin";
import * as shared from "./shared-internals";
import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request";
import { Queue } from "./queue";

export class ReadableByteStreamController implements rs.ReadableByteStreamController {
[rs.autoAllocateChunkSize_]: number | undefined;
Expand All @@ -23,7 +24,7 @@ export class ReadableByteStreamController implements rs.ReadableByteStreamContro
[rs.started_]: boolean;
[rs.strategyHWM_]: number;

[q.queue_]: { buffer: ArrayBufferLike, byteOffset: number, byteLength: number }[];
[q.queue_]: Queue<{ buffer: ArrayBufferLike, byteOffset: number, byteLength: number }>;
[q.queueTotalSize_]: number;

constructor() {
Expand Down
2 changes: 1 addition & 1 deletion packages/streams/src/readable-internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(cont
const queue = controller[q.queue_];

while (totalBytesToCopyRemaining > 0) {
const headOfQueue = queue[0];
const headOfQueue = queue.front()!;
const bytesToCopy = Math.min(totalBytesToCopyRemaining, headOfQueue.byteLength);
const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
shared.copyDataBlockBytes(pullIntoDescriptor.buffer, destStart, headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy);
Expand Down
3 changes: 2 additions & 1 deletion packages/streams/src/readable-stream-default-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import * as rs from "./readable-internals";
import * as shared from "./shared-internals";
import * as q from "./queue-mixin";
import { Queue } from "./queue";

export class ReadableStreamDefaultController implements rs.ReadableStreamDefaultController {
[rs.cancelAlgorithm_]: rs.CancelAlgorithm;
Expand All @@ -20,7 +21,7 @@ export class ReadableStreamDefaultController implements rs.ReadableStreamDefault
[rs.strategySizeAlgorithm_]: shared.SizeAlgorithm;
[rs.started_]: boolean;

[q.queue_]: q.QueueElement<any>[];
[q.queue_]: Queue<q.QueueElement<any>>;
[q.queueTotalSize_]: number;

constructor() {
Expand Down
3 changes: 2 additions & 1 deletion packages/streams/src/writable-stream-default-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import * as ws from "./writable-internals";
import * as shared from "./shared-internals";
import * as q from "./queue-mixin";
import { Queue } from "./queue";

export class WritableStreamDefaultController implements ws.WritableStreamDefaultController {
[ws.abortAlgorithm_]: ws.AbortAlgorithm;
Expand All @@ -18,7 +19,7 @@ export class WritableStreamDefaultController implements ws.WritableStreamDefault
[ws.strategySizeAlgorithm_]: shared.SizeAlgorithm;
[ws.writeAlgorithm_]: ws.WriteAlgorithm;

[q.queue_]: q.QueueElement<ws.WriteRecord | "close">[];
[q.queue_]: Queue<q.QueueElement<ws.WriteRecord | "close">>;
[q.queueTotalSize_]: number;

constructor() {
Expand Down

0 comments on commit 1128f0d

Please sign in to comment.