-
-
Notifications
You must be signed in to change notification settings - Fork 47
Performance
This toolkit is used to process huge files. As such even a millisecond per operation can add up to minutes and hours. For example, a microsecond over 1 billion operations will add ~16.5 minutes. A millisecond over 1 billion operations will add ~11.5 days.
That's why the performance considerations played a major role in the design and implementation of stream-json
. Below is the list of best practices for users of the toolkit.
Every chain in a stream-based data processing pipeline introduces a latency. Try to minimize the size of your pipeline. While it is tempting to use a lot of small filters/transforms, try to combine them into one component, if possible (the example use stream-chain for simplicity):
const {chain} = require('stream-chain');
// fine-grained, but less efficient
chain([
sourceStream,
// filters
data => data.key % 1 !== 0 ? data : null,
data => data.value.important ? data : null,
// transforms
data => data.value.price,
price => price * taxRate
]);
// more efficient
chain([
sourceStream,
data => {
if (data.key % 1 !== 0 && data.value.important) {
return data.value.price * taxRate;
}
return null; // ignore
}
]);
In general, boundaries between streams are relatively expensive, and should be used when stream components generate a varying number of items — this way we can take advantage of the stream's ability to handle a back-pressure correctly. Otherwise, simple function calls are more efficient.
The less traffic goes across a pipeline the faster it is. If you use filters, try to remove as many items as possible as early as possible by arranging filters properly:
// let's assume that we have a small number of important objects,
// and valid() is an expensive function to calculate
// fine-grained, but less efficient
chain([
sourceStream,
// filters
data => valid(data) ? data : null,
data => data.value.important ? data : null
]);
// better
chain([
sourceStream,
// filters
data => data.value.important ? data : null,
data => valid(data) ? data : null
]);
// best
chain([
sourceStream,
// filters
data => data.value.important && valid(data) ? data : null
]);
The filters should be arranged to minimize the expense of filtering. The same goes for transforms.
While Parser streams values just fine, sometimes we need a value itself. Parser
can pack values and send out xxxValue
itself. No need to do it in your custom code, unless it provides some discernable benefits.
Sometimes, when we have values, we don't need them to be streamed. Suppressing them can reduce traffic. Parser
can be told to suppress streaming values using streamXXX
options, which work only when packing corresponding values:
const parser1 = new Parser();
// streams values like that:
// {name: 'startString'}
// {name: 'stringChunk', value: 'a'} // zero or more chunks
// {name: 'stringChunk', value: 'b'}
// {name: 'endString'}
// {name: 'stringValue', value: 'ab'}
// In reality, it is unlikely to have chunks one character worth.
const parser2 = new Parser({packValues: false});
// streams values like that:
// {name: 'startString'}
// {name: 'stringChunk', value: 'a'} // zero or more chunks
// {name: 'stringChunk', value: 'b'}
// {name: 'endString'}
const parser3 = new Parser({streamValues: false});
// streams values like that:
// {name: 'stringValue', value: 'ab'}
Some components downstream may have special requirements for a stream. For example, filters may want to have key values present. Replace
may have additional requirements described below. Stringer
by default uses value chunks but can be switched to use values as described below.
The main module returns a function, which creates a parser decorated with emit(). Use alternatives, if this functionality is not required:
const makeParser = require('stream-json');
makeParser().pipe(someFilter); // token events are not used
// better #1
const {Parser} = require('stream-json');
new Parser().pipe(someFilter);
// better #2
const {parser} = require('stream-json');
parser().pipe(someFilter);
(Since 1.6.0) If you deal with a strict JSONL (or NDJSON) format, and convert token streams to JavaScript objects using streamers, use jsonl/Parser to improve performance.
const makeParser = require('stream-json');
const {streamValues} = require('stream-json/streamers/StreamValues');
chain([
makeParser({jsonStreaming: true}),
streamValues(),
someConsumer
]);
// more efficient
const {parser: jsonlParser} = require('stream-json/jsonl/Parser');
chain([
jsonlParser(),
someConsumer
]);
A common use case is to select just one item from a stream. Frequently after selecting no other items can be matched, yet a filter is applied to them. It is an especially common case for filters specified by a string for the direct match. One way to eliminate this inefficiency is to set {once: true}
in options of a filter.
Replace can generate substreams by itself:
- A replacement substream provided by a user.
- Property keys can be generated on a replacement.
Ignore is based on Replace
.
The former case is 100% controlled by a user, while the latter case can include a streaming part:
const replace1 = new Replace();
// can generate keys like that:
// {name: 'startKey'}
// {name: 'stringChunk', value: 'a'}
// {name: 'endKey'}
// {name: 'keyValue', value: 'a'}
const replace2 = new Replace({streamKeys: false});
// can generate keys like that:
// {name: 'keyValue', value: 'a'}
Usually, we select the same style of values across the whole pipeline.
All streamers support objectFilter
to control if incoming objects are assembled into JavaScript objects, or discarded without assembling. Depending on many factors, it can be more efficient to filter streams at this level.
The following things should be considered:
- If a property required to make a filtering decision usually comes last, the whole object will be selected before we can render a decision. In this case, it is unlikely to gain any performance benefit.
-
objectFilter
function will be called on every update of an object being assembled. If a filtering function is relatively expensive, it could be more beneficial to filter a stream after assembling an object.
// variant #1
chain([
sourceStream,
new StreamArray({objectFilter: asm => {
const value = asm.current;
if (value && value.hasProperty('important')) {
return value.important;
}
// return undefined; // we are undecided yet
}})
]);
// variant #2
chain([
sourceStream,
new StreamArray(),
data => {
const value = data.value;
return value && value.important;
}
]);
Analyze your data stream, or even benchmark it, in order to decide on the most optimal way to filter the stream.
Utf8Stream provides an additional value only if it reads from binary buffers. If its input is string data, it is essentially a pass-through and it can be excluded from a pipeline.
withParser() and static methods of streaming components with the same name return a chain object by stream-chain.
A chain object is an instance of Duplex, which wraps a pipeline adding an extra level of indirection. This overhead can be easily avoided:
// variant #1
const pipeline = StreamArray.withParser();
fs.createReadStream('sample.json').pipe(pipeline);
pipeline.on('end', () => console.log('done!'));
// variant #2: more verbose, yet slightly more efficient
const pipeline = StreamArray.withParser();
fs.createReadStream('sample.json').pipe(pipeline.input);
pipeline.output.on('end', () => console.log('done!'));
As you can see, a chain object looks like a Source
of 0.x on steroids and can be used as such.
While Assembler comes with consume(data)
, it is easy to do it yourself without calling a function. The common pattern for Assembler
-like objects is simple:
data => asm[data.name] && asm[data.name](data.value)
Consider it, when creating your own components incorporating Assembler
or similar objects.
Stringer uses value chunks (stringChunk
and numberChunk
) to produces its output. If streaming of values was disabled in a pipeline with {streamValues: false}
or some other means, it will break Stringer
. In this case, it should be switched to use packed values. It can be done with the following options: useValues
, useKeyValues
, useStringValues
, and useNumberValues
. Always make sure that a constructed pipeline is consistent.
Both Emitter and emit() are helpers. If they are proved to be a bottleneck, we can easily avoid to use them:
// variant #1
const emitter = new Emitter();
sourceStream.pipe(emitter);
emitter.on('startObject', () => console.log('object!'));
// variant #2
emit(sourceStream);
sourceStream.on('startObject', () => console.log('object!'));
// more efficient variant #3
sourceStream.on('data', data => data.name === 'startObject' && console.log('object!'));