diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 8e6a303a2e27a5..03c0ed9159f64a 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -63,6 +63,7 @@ let freeParser; let HTTPParser; const MAX_HANDLE_RETRANSMISSIONS = 3; +const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); // This object contain function to convert TCP objects to native handle objects // and back again. @@ -278,8 +279,14 @@ function flushStdio(subprocess) { for (var i = 0; i < stdio.length; i++) { const stream = stdio[i]; - if (!stream || !stream.readable || stream._readableState.readableListening) + // TODO(addaleax): This doesn't necessarily account for all the ways in + // which data can be read from a stream, e.g. being consumed on the + // native layer directly as a StreamBase. + if (!stream || !stream.readable || + stream._readableState.readableListening || + stream[kIsUsedAsStdio]) { continue; + } stream.resume(); } } @@ -384,12 +391,16 @@ ChildProcess.prototype.spawn = function(options) { continue; } - // The stream is already cloned and piped, thus close it. + // The stream is already cloned and piped, thus stop its readable side, + // otherwise we might attempt to read from the stream when at the same time + // the child process does. if (stream.type === 'wrap') { - stream.handle.close(); - if (stream._stdio && stream._stdio instanceof EventEmitter) { - stream._stdio.emit('close'); - } + stream.handle.reading = false; + stream.handle.readStop(); + stream._stdio.pause(); + stream._stdio.readableFlowing = false; + stream._stdio._readableState.reading = false; + stream._stdio[kIsUsedAsStdio] = true; continue; } diff --git a/test/parallel/test-child-process-pipe-dataflow.js b/test/parallel/test-child-process-pipe-dataflow.js index f5068b5d366468..88a31f4ff8429b 100644 --- a/test/parallel/test-child-process-pipe-dataflow.js +++ b/test/parallel/test-child-process-pipe-dataflow.js @@ -33,6 +33,10 @@ const MB = KB * KB; grep = spawn('grep', ['x'], { stdio: [cat.stdout, 'pipe', 'pipe'] }); wc = spawn('wc', ['-c'], { stdio: [grep.stdout, 'pipe', 'pipe'] }); + // Extra checks: We never try to start reading data ourselves. + cat.stdout._handle.readStart = common.mustNotCall(); + grep.stdout._handle.readStart = common.mustNotCall(); + [cat, grep, wc].forEach((child, index) => { child.stderr.on('data', (d) => { // Don't want to assert here, as we might miss error code info. diff --git a/test/parallel/test-child-process-server-close.js b/test/parallel/test-child-process-server-close.js index ec95fed67b4fa7..d70926f2e8278e 100644 --- a/test/parallel/test-child-process-server-close.js +++ b/test/parallel/test-child-process-server-close.js @@ -8,11 +8,11 @@ const tmpdir = require('../common/tmpdir'); tmpdir.refresh(); const server = net.createServer((conn) => { - conn.on('close', common.mustCall()); - spawn(process.execPath, ['-v'], { stdio: ['ignore', conn, 'ignore'] - }).on('close', common.mustCall()); + }).on('close', common.mustCall(() => { + conn.end(); + })); }).listen(common.PIPE, () => { const client = net.connect(common.PIPE, common.mustCall()); client.on('data', () => { diff --git a/test/parallel/test-child-process-stdio-merge-stdouts-into-cat.js b/test/parallel/test-child-process-stdio-merge-stdouts-into-cat.js new file mode 100644 index 00000000000000..64373e9e160937 --- /dev/null +++ b/test/parallel/test-child-process-stdio-merge-stdouts-into-cat.js @@ -0,0 +1,30 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { spawn } = require('child_process'); + +// Regression test for https://github.com/nodejs/node/issues/27097. +// Check that (cat [p1] ; cat [p2]) | cat [p3] works. + +const p3 = spawn('cat', { stdio: ['pipe', 'pipe', 'inherit'] }); +const p1 = spawn('cat', { stdio: ['pipe', p3.stdin, 'inherit'] }); +const p2 = spawn('cat', { stdio: ['pipe', p3.stdin, 'inherit'] }); +p3.stdout.setEncoding('utf8'); + +// Write three different chunks: +// - 'hello' from this process to p1 to p3 back to us +// - 'world' from this process to p2 to p3 back to us +// - 'foobar' from this process to p3 back to us +// Do so sequentially in order to avoid race conditions. +p1.stdin.end('hello\n'); +p3.stdout.once('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'hello\n'); + p2.stdin.end('world\n'); + p3.stdout.once('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'world\n'); + p3.stdin.end('foobar\n'); + p3.stdout.once('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'foobar\n'); + })); + })); +})); diff --git a/test/parallel/test-child-process-stdio-reuse-readable-stdio.js b/test/parallel/test-child-process-stdio-reuse-readable-stdio.js new file mode 100644 index 00000000000000..263270f8e8891f --- /dev/null +++ b/test/parallel/test-child-process-stdio-reuse-readable-stdio.js @@ -0,0 +1,27 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { spawn } = require('child_process'); + +// Check that, once a child process has ended, it’s safe to read from a pipe +// that the child had used as input. +// We simulate that using cat | (head -n1; ...) + +const p1 = spawn('cat', { stdio: ['pipe', 'pipe', 'inherit'] }); +const p2 = spawn('head', ['-n1'], { stdio: [p1.stdout, 'pipe', 'inherit'] }); + +// First, write the line that gets passed through p2, making 'head' exit. +p1.stdin.write('hello\n'); +p2.stdout.setEncoding('utf8'); +p2.stdout.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'hello\n'); +})); +p2.on('exit', common.mustCall(() => { + // We can now use cat’s output, because 'head' is no longer reading from it. + p1.stdin.end('world\n'); + p1.stdout.setEncoding('utf8'); + p1.stdout.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'world\n'); + })); + p1.stdout.resume(); +}));