Skip to content

Commit

Permalink
Add stream block event
Browse files Browse the repository at this point in the history
This is useful for example to access how many values were de/encoded.
  • Loading branch information
mtth committed May 2, 2021
1 parent 69599b1 commit 75a8fac
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 79 deletions.
17 changes: 13 additions & 4 deletions lib/containers.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ BlockDecoder.prototype._writeChunk = function (chunk, encoding, cb) {
nBlocks++;
this._decompress(
block.data,
this._createBlockCallback(block.count, chunkCb)
this._createBlockCallback(block.data.length, block.count, chunkCb)
);
}
chunkCb();
Expand All @@ -255,7 +255,7 @@ BlockDecoder.prototype._writeChunk = function (chunk, encoding, cb) {
}
};

BlockDecoder.prototype._createBlockCallback = function (count, cb) {
BlockDecoder.prototype._createBlockCallback = function (size, count, cb) {
var self = this;
var index = this._index++;

Expand All @@ -266,6 +266,7 @@ BlockDecoder.prototype._createBlockCallback = function (count, cb) {
self.emit('error', err);
cb();
} else {
self.emit('block', new BlockInfo(count, data.length, size));
self._queue.push(new BlockData(index, data, cb, count));
if (self._needPush) {
self._read();
Expand Down Expand Up @@ -525,7 +526,7 @@ BlockEncoder.prototype._write = function (val, encoding, cb) {
BlockEncoder.prototype._flushChunk = function (pos, cb) {
var tap = this._tap;
pos = pos || tap.pos;
this._compress(tap.buf.slice(0, pos), this._createBlockCallback(cb));
this._compress(tap.buf.slice(0, pos), this._createBlockCallback(pos, cb));
this._blockCount = 0;
};

Expand All @@ -551,7 +552,7 @@ BlockEncoder.prototype._read = function () {
}
};

BlockEncoder.prototype._createBlockCallback = function (cb) {
BlockEncoder.prototype._createBlockCallback = function (size, cb) {
var self = this;
var index = this._index++;
var count = this._blockCount;
Expand All @@ -565,6 +566,7 @@ BlockEncoder.prototype._createBlockCallback = function (cb) {
return;
}
self._pending--;
self.emit('block', new BlockInfo(count, size, data.length));
self._queue.push(new BlockData(index, data, cb, count));
if (self._needPush) {
self._needPush = false;
Expand All @@ -576,6 +578,13 @@ BlockEncoder.prototype._createBlockCallback = function (cb) {

// Helpers.

/** Summary information about a block. */
function BlockInfo(count, raw, compressed) {
this.valueCount = count;
this.rawDataLength = raw;
this.compressedDataLength = compressed;
}

/**
* An indexed block.
*
Expand Down
Loading

0 comments on commit 75a8fac

Please sign in to comment.