-
-
Notifications
You must be signed in to change notification settings - Fork 6.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-attach stdout and stderr from new processes after retries #8087
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,9 +6,11 @@ | |
*/ | ||
|
||
import path from 'path'; | ||
import {PassThrough} from 'stream'; | ||
// ESLint doesn't know about this experimental module | ||
// eslint-disable-next-line import/no-unresolved | ||
import {Worker} from 'worker_threads'; | ||
import mergeStream from 'merge-stream'; | ||
|
||
import { | ||
CHILD_MESSAGE_INITIALIZE, | ||
|
@@ -29,10 +31,17 @@ export default class ExperimentalWorker implements WorkerInterface { | |
private _onProcessEnd!: OnEnd; | ||
private _request: ChildMessage | null; | ||
private _retries!: number; | ||
private _stderr: ReturnType<typeof mergeStream> | null; | ||
private _stdout: ReturnType<typeof mergeStream> | null; | ||
private _fakeStream: PassThrough | null; | ||
|
||
constructor(options: WorkerOptions) { | ||
this._options = options; | ||
this._request = null; | ||
this._stderr = null; | ||
this._stdout = null; | ||
this._fakeStream = null; | ||
|
||
this.initialize(); | ||
} | ||
|
||
|
@@ -54,6 +63,26 @@ export default class ExperimentalWorker implements WorkerInterface { | |
}, | ||
}); | ||
|
||
if (this._worker.stdout) { | ||
if (!this._stdout) { | ||
// We need to add a permanent stream to the merged stream to prevent it | ||
// from ending when the subprocess stream ends | ||
this._stdout = mergeStream(this._getFakeStream()); | ||
} | ||
|
||
this._stdout.add(this._worker.stdout); | ||
} | ||
|
||
if (this._worker.stderr) { | ||
if (!this._stderr) { | ||
// We need to add a permanent stream to the merged stream to prevent it | ||
// from ending when the subprocess stream ends | ||
this._stderr = mergeStream(this._getFakeStream()); | ||
} | ||
|
||
this._stderr.add(this._worker.stderr); | ||
} | ||
|
||
this._worker.on('message', this.onMessage.bind(this)); | ||
this._worker.on('exit', this.onExit.bind(this)); | ||
|
||
|
@@ -82,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface { | |
} | ||
} | ||
|
||
private _shutdown() { | ||
// End the permanent stream so the merged stream end too | ||
if (this._fakeStream) { | ||
this._fakeStream.end(); | ||
this._fakeStream = null; | ||
} | ||
} | ||
|
||
onMessage(response: ParentMessage) { | ||
let error; | ||
|
||
|
@@ -132,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface { | |
if (this._request) { | ||
this._worker.postMessage(this._request); | ||
} | ||
} else { | ||
this._shutdown(); | ||
} | ||
} | ||
|
||
|
@@ -154,11 +193,18 @@ export default class ExperimentalWorker implements WorkerInterface { | |
return this._options.workerId; | ||
} | ||
|
||
getStdout() { | ||
return this._worker.stdout; | ||
getStdout(): NodeJS.ReadableStream | null { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the type defined in the interface, no? Does TS requires you to add this explicitly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, otherwise I'm getting this error: [ts] Return type of public method from exported class has or is using name 'MergedStream' from external module "/Users/rubennorte/gitrepos/github/jest/node_modules/@types/merge-stream/index" but cannot be named. |
||
return this._stdout; | ||
} | ||
|
||
getStderr() { | ||
return this._worker.stderr; | ||
getStderr(): NodeJS.ReadableStream | null { | ||
return this._stderr; | ||
} | ||
|
||
private _getFakeStream() { | ||
if (!this._fakeStream) { | ||
this._fakeStream = new PassThrough(); | ||
} | ||
return this._fakeStream; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ugh, but they don't export the interface: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/b32ee2597779e6376db8c49e68d75c68e50b9287/types/merge-stream/index.d.ts#L11-L14