From be89aafd95296e9721e124b77eee7c745e1c1e97 Mon Sep 17 00:00:00 2001 From: isaacs Date: Mon, 2 Aug 2021 16:21:35 -0700 Subject: [PATCH] WriteEntry backpressure --- lib/write-entry.js | 28 ++++++++++++++++++++++------ test/write-entry.js | 3 ++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/lib/write-entry.js b/lib/write-entry.js index 906bfc28..4385c040 100644 --- a/lib/write-entry.js +++ b/lib/write-entry.js @@ -21,6 +21,8 @@ const OPENFILE = Symbol('openfile') const ONOPENFILE = Symbol('onopenfile') const CLOSE = Symbol('close') const MODE = Symbol('mode') +const AWAITDRAIN = Symbol('awaitDrain') +const ONDRAIN = Symbol('ondrain') const warner = require('./warn-mixin.js') const winchars = require('./winchars.js') const stripAbsolutePath = require('./strip-absolute-path.js') @@ -232,7 +234,7 @@ const WriteEntry = warner(class WriteEntry extends MiniPass { this.pos = 0 this.remain = this.stat.size this.length = this.buf.length - this[READ](this.stat.size) + this[READ]() } [READ] () { @@ -284,13 +286,23 @@ const WriteEntry = warner(class WriteEntry extends MiniPass { const writeBuf = this.offset === 0 && bytesRead === this.buf.length ? this.buf : this.buf.slice(this.offset, this.offset + bytesRead) - this.remain -= bytesRead - this.blockRemain -= bytesRead - this.pos += bytesRead - this.offset += bytesRead + this.remain -= writeBuf.length + this.blockRemain -= writeBuf.length + this.pos += writeBuf.length + this.offset += writeBuf.length + + const flushed = this.write(writeBuf) + if (!flushed) + this[AWAITDRAIN](() => this[ONDRAIN]()) + else + this[ONDRAIN]() + } - this.write(writeBuf) + [AWAITDRAIN] (cb) { + this.once('drain', cb) + } + [ONDRAIN] () { if (!this.remain) { if (this.blockRemain) this.write(Buffer.alloc(this.blockRemain)) @@ -339,6 +351,10 @@ class WriteEntrySync extends WriteEntry { } } + [AWAITDRAIN] (cb) { + cb() + } + [CLOSE] (cb) { fs.closeSync(this.fd) cb() diff --git a/test/write-entry.js b/test/write-entry.js index c177f89e..9e5e33cd 100644 --- a/test/write-entry.js +++ b/test/write-entry.js @@ -254,6 +254,7 @@ t.test('zero-byte file, but close fails', t => { t.match(er, { message: 'poop' }) t.end() }) + ws.resume() }) t.test('hardlinks', t => { @@ -597,7 +598,7 @@ t.test('read overflow expectation', t => { t.throws(_ => new WriteEntry.Sync(f, { cwd: files, maxReadSize: 2 }), expect) new WriteEntry(f, { cwd: files, maxReadSize: 2 }).on('error', er => { t.match(er, expect) - }) + }).resume() }) t.test('short reads', t => {