From 14810d342af9b611b67894256869c4246ab59f87 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 19 May 2023 15:53:45 -0400 Subject: [PATCH] fix(node-http-handler): open promise handle while waiting for http continue (#4719) --- .../src/node-http-handler.spec.ts | 18 ++++++---- .../src/node-http-handler.ts | 17 +++++++-- .../src/node-http2-handler.spec.ts | 4 +-- .../src/node-http2-handler.ts | 34 ++++++++++++------ .../src/write-request-body.ts | 36 +++++++++++++++---- 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/packages/node-http-handler/src/node-http-handler.spec.ts b/packages/node-http-handler/src/node-http-handler.spec.ts index def1b5f8d1de..7ecfa1a93eb0 100644 --- a/packages/node-http-handler/src/node-http-handler.spec.ts +++ b/packages/node-http-handler/src/node-http-handler.spec.ts @@ -18,18 +18,19 @@ describe("NodeHttpHandler", () => { let hRequestSpy: jest.SpyInstance; let hsRequestSpy: jest.SpyInstance; const randomMaxSocket = Math.round(Math.random() * 50) + 1; - const mockRequestImpl = (_options, cb) => { + const mockRequestImpl = (protocol: string) => (_options, cb) => { cb({ statusCode: 200, body: "body", headers: {}, + protocol, }); - return new http.ClientRequest(_options); + return new http.ClientRequest({ ..._options, protocol }); }; beforeEach(() => { - hRequestSpy = jest.spyOn(http, "request").mockImplementation(mockRequestImpl); - hsRequestSpy = jest.spyOn(https, "request").mockImplementation(mockRequestImpl); + hRequestSpy = jest.spyOn(http, "request").mockImplementation(mockRequestImpl("http:")); + hsRequestSpy = jest.spyOn(https, "request").mockImplementation(mockRequestImpl("https:")); }); afterEach(() => { @@ -97,8 +98,8 @@ describe("NodeHttpHandler", () => { return { connectionTimeout: 12345, socketTimeout: 12345, - httpAgent: null, - httpsAgent: null, + httpAgent: void 0, + httpsAgent: void 0, }; }; @@ -118,7 +119,10 @@ describe("NodeHttpHandler", () => { }); describe("http", () => { - const mockHttpServer: HttpServer = createMockHttpServer().listen(54321); + let mockHttpServer: HttpServer; + beforeAll(() => { + mockHttpServer = createMockHttpServer().listen(54321); + }); afterEach(() => { mockHttpServer.removeAllListeners("request"); diff --git a/packages/node-http-handler/src/node-http-handler.ts b/packages/node-http-handler/src/node-http-handler.ts index 4f7dbc4cf0c8..2fa5d6a4e685 100644 --- a/packages/node-http-handler/src/node-http-handler.ts +++ b/packages/node-http-handler/src/node-http-handler.ts @@ -7,9 +7,9 @@ import { Agent as hsAgent, request as hsRequest, RequestOptions } from "https"; import { NODEJS_TIMEOUT_ERROR_CODES } from "./constants"; import { getTransformedHeaders } from "./get-transformed-headers"; import { setConnectionTimeout } from "./set-connection-timeout"; +import { setSocketKeepAlive } from "./set-socket-keep-alive"; import { setSocketTimeout } from "./set-socket-timeout"; import { writeRequestBody } from "./write-request-body"; -import { setSocketKeepAlive } from "./set-socket-keep-alive"; /** * Represents the http options that can be passed to a node http client. @@ -93,7 +93,17 @@ export class NodeHttpHandler implements HttpHandler { if (!this.config) { this.config = await this.configProvider; } - return new Promise((resolve, reject) => { + return new Promise((_resolve, _reject) => { + let writeRequestBodyPromise: Promise | undefined = undefined; + const resolve = async (arg: { response: HttpResponse }) => { + await writeRequestBodyPromise; + _resolve(arg); + }; + const reject = async (arg: unknown) => { + await writeRequestBodyPromise; + _reject(arg); + }; + if (!this.config) { throw new Error("Node HTTP request handler config is not resolved"); } @@ -120,6 +130,7 @@ export class NodeHttpHandler implements HttpHandler { // create the http request const requestFunc = isSSL ? hsRequest : hRequest; + const req = requestFunc(nodeHttpsOptions, (res) => { const httpResponse = new HttpResponse({ statusCode: res.statusCode || -1, @@ -163,7 +174,7 @@ export class NodeHttpHandler implements HttpHandler { }); } - writeRequestBody(req, request); + writeRequestBodyPromise = writeRequestBody(req, request, this.config.requestTimeout); }); } } diff --git a/packages/node-http-handler/src/node-http2-handler.spec.ts b/packages/node-http-handler/src/node-http2-handler.spec.ts index af29c1b23b86..f10b2ce94f04 100644 --- a/packages/node-http-handler/src/node-http2-handler.spec.ts +++ b/packages/node-http-handler/src/node-http2-handler.spec.ts @@ -15,7 +15,7 @@ describe(NodeHttp2Handler.name, () => { const protocol = "http:"; const hostname = "localhost"; const port = 45321; - let mockH2Server = undefined; + let mockH2Server: any = undefined; let mockH2Servers: Record = {}; const authority = `${protocol}//${hostname}:${port}/`; @@ -233,7 +233,7 @@ describe(NodeHttp2Handler.name, () => { // ...and validate that the mocked response is received const responseBody = await new Promise((resolve) => { - const buffers = []; + const buffers: any[] = []; resultReader.on("data", (chunk) => buffers.push(chunk)); resultReader.on("close", () => { resolve(Buffer.concat(buffers).toString("utf8")); diff --git a/packages/node-http-handler/src/node-http2-handler.ts b/packages/node-http-handler/src/node-http2-handler.ts index 631a6d075955..d5df770ec485 100644 --- a/packages/node-http-handler/src/node-http2-handler.ts +++ b/packages/node-http-handler/src/node-http2-handler.ts @@ -76,17 +76,27 @@ export class NodeHttp2Handler implements HttpHandler { } } const { requestTimeout, disableConcurrentStreams } = this.config; - return new Promise((resolve, rejectOriginal) => { + return new Promise((_resolve, _reject) => { // It's redundant to track fulfilled because promises use the first resolution/rejection // but avoids generating unnecessary stack traces in the "close" event handler. let fulfilled = false; + let writeRequestBodyPromise: Promise | undefined = undefined; + const resolve = async (arg: { response: HttpResponse }) => { + await writeRequestBodyPromise; + _resolve(arg); + }; + const reject = async (arg: unknown) => { + await writeRequestBodyPromise; + _reject(arg); + }; + // if the request was already aborted, prevent doing extra work if (abortSignal?.aborted) { fulfilled = true; const abortError = new Error("Request aborted"); abortError.name = "AbortError"; - rejectOriginal(abortError); + reject(abortError); return; } @@ -98,12 +108,12 @@ export class NodeHttp2Handler implements HttpHandler { disableConcurrentStreams: disableConcurrentStreams || false, } as ConnectConfiguration); - const reject = (err: Error) => { + const rejectWithDestroy = (err: Error) => { if (disableConcurrentStreams) { this.destroySession(session); } fulfilled = true; - rejectOriginal(err); + reject(err); }; const queryString = buildQueryString(query || {}); @@ -138,7 +148,7 @@ export class NodeHttp2Handler implements HttpHandler { req.close(); const timeoutError = new Error(`Stream timed out because of no activity for ${requestTimeout} ms`); timeoutError.name = "TimeoutError"; - reject(timeoutError); + rejectWithDestroy(timeoutError); }); } @@ -147,17 +157,19 @@ export class NodeHttp2Handler implements HttpHandler { req.close(); const abortError = new Error("Request aborted"); abortError.name = "AbortError"; - reject(abortError); + rejectWithDestroy(abortError); }; } // Set up handlers for errors req.on("frameError", (type: number, code: number, id: number) => { - reject(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`)); + rejectWithDestroy(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`)); }); - req.on("error", reject); + req.on("error", rejectWithDestroy); req.on("aborted", () => { - reject(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`)); + rejectWithDestroy( + new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`) + ); }); // The HTTP/2 error code used when closing the stream can be retrieved using the @@ -169,11 +181,11 @@ export class NodeHttp2Handler implements HttpHandler { session.destroy(); } if (!fulfilled) { - reject(new Error("Unexpected error: http2 request did not get a response")); + rejectWithDestroy(new Error("Unexpected error: http2 request did not get a response")); } }); - writeRequestBody(req, request); + writeRequestBodyPromise = writeRequestBody(req, request, requestTimeout); }); } diff --git a/packages/node-http-handler/src/write-request-body.ts b/packages/node-http-handler/src/write-request-body.ts index bc21ff6e5b79..f933fa075fba 100644 --- a/packages/node-http-handler/src/write-request-body.ts +++ b/packages/node-http-handler/src/write-request-body.ts @@ -3,15 +3,37 @@ import { ClientRequest } from "http"; import { ClientHttp2Stream } from "http2"; import { Readable } from "stream"; -export function writeRequestBody(httpRequest: ClientRequest | ClientHttp2Stream, request: HttpRequest) { - const expect = request.headers["Expect"] || request.headers["expect"]; +const MIN_WAIT_TIME = 1000; + +/** + * This resolves when writeBody has been called. + * + * @param httpRequest - opened Node.js request. + * @param request - container with the request body. + * @param maxContinueTimeoutMs - maximum time to wait for the continue event. Minimum of 1000ms. + */ +export async function writeRequestBody( + httpRequest: ClientRequest | ClientHttp2Stream, + request: HttpRequest, + maxContinueTimeoutMs = MIN_WAIT_TIME +): Promise { + const headers = request.headers ?? {}; + const expect = headers["Expect"] || headers["expect"]; + if (expect === "100-continue") { - httpRequest.on("continue", () => { - writeBody(httpRequest, request.body); - }); - } else { - writeBody(httpRequest, request.body); + await Promise.race([ + new Promise((resolve) => { + setTimeout(resolve, Math.max(MIN_WAIT_TIME, maxContinueTimeoutMs)); + }), + new Promise((resolve) => { + httpRequest.on("continue", () => { + resolve(); + }); + }), + ]); } + + writeBody(httpRequest, request.body); } function writeBody(