Skip to content

Commit

Permalink
fix(node-http-handler): open promise handle while waiting for http co…
Browse files Browse the repository at this point in the history
…ntinue (#4719)
  • Loading branch information
kuhe authored May 19, 2023
1 parent 345835a commit 14810d3
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 30 deletions.
18 changes: 11 additions & 7 deletions packages/node-http-handler/src/node-http-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ describe("NodeHttpHandler", () => {
let hRequestSpy: jest.SpyInstance;
let hsRequestSpy: jest.SpyInstance;
const randomMaxSocket = Math.round(Math.random() * 50) + 1;
const mockRequestImpl = (_options, cb) => {
const mockRequestImpl = (protocol: string) => (_options, cb) => {
cb({
statusCode: 200,
body: "body",
headers: {},
protocol,
});
return new http.ClientRequest(_options);
return new http.ClientRequest({ ..._options, protocol });
};

beforeEach(() => {
hRequestSpy = jest.spyOn(http, "request").mockImplementation(mockRequestImpl);
hsRequestSpy = jest.spyOn(https, "request").mockImplementation(mockRequestImpl);
hRequestSpy = jest.spyOn(http, "request").mockImplementation(mockRequestImpl("http:"));
hsRequestSpy = jest.spyOn(https, "request").mockImplementation(mockRequestImpl("https:"));
});

afterEach(() => {
Expand Down Expand Up @@ -97,8 +98,8 @@ describe("NodeHttpHandler", () => {
return {
connectionTimeout: 12345,
socketTimeout: 12345,
httpAgent: null,
httpsAgent: null,
httpAgent: void 0,
httpsAgent: void 0,
};
};

Expand All @@ -118,7 +119,10 @@ describe("NodeHttpHandler", () => {
});

describe("http", () => {
const mockHttpServer: HttpServer = createMockHttpServer().listen(54321);
let mockHttpServer: HttpServer;
beforeAll(() => {
mockHttpServer = createMockHttpServer().listen(54321);
});

afterEach(() => {
mockHttpServer.removeAllListeners("request");
Expand Down
17 changes: 14 additions & 3 deletions packages/node-http-handler/src/node-http-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import { Agent as hsAgent, request as hsRequest, RequestOptions } from "https";
import { NODEJS_TIMEOUT_ERROR_CODES } from "./constants";
import { getTransformedHeaders } from "./get-transformed-headers";
import { setConnectionTimeout } from "./set-connection-timeout";
import { setSocketKeepAlive } from "./set-socket-keep-alive";
import { setSocketTimeout } from "./set-socket-timeout";
import { writeRequestBody } from "./write-request-body";
import { setSocketKeepAlive } from "./set-socket-keep-alive";

/**
* Represents the http options that can be passed to a node http client.
Expand Down Expand Up @@ -93,7 +93,17 @@ export class NodeHttpHandler implements HttpHandler {
if (!this.config) {
this.config = await this.configProvider;
}
return new Promise((resolve, reject) => {
return new Promise((_resolve, _reject) => {
let writeRequestBodyPromise: Promise<void> | undefined = undefined;
const resolve = async (arg: { response: HttpResponse }) => {
await writeRequestBodyPromise;
_resolve(arg);
};
const reject = async (arg: unknown) => {
await writeRequestBodyPromise;
_reject(arg);
};

if (!this.config) {
throw new Error("Node HTTP request handler config is not resolved");
}
Expand All @@ -120,6 +130,7 @@ export class NodeHttpHandler implements HttpHandler {

// create the http request
const requestFunc = isSSL ? hsRequest : hRequest;

const req = requestFunc(nodeHttpsOptions, (res) => {
const httpResponse = new HttpResponse({
statusCode: res.statusCode || -1,
Expand Down Expand Up @@ -163,7 +174,7 @@ export class NodeHttpHandler implements HttpHandler {
});
}

writeRequestBody(req, request);
writeRequestBodyPromise = writeRequestBody(req, request, this.config.requestTimeout);
});
}
}
4 changes: 2 additions & 2 deletions packages/node-http-handler/src/node-http2-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe(NodeHttp2Handler.name, () => {
const protocol = "http:";
const hostname = "localhost";
const port = 45321;
let mockH2Server = undefined;
let mockH2Server: any = undefined;
let mockH2Servers: Record<number, Http2Server> = {};

const authority = `${protocol}//${hostname}:${port}/`;
Expand Down Expand Up @@ -233,7 +233,7 @@ describe(NodeHttp2Handler.name, () => {

// ...and validate that the mocked response is received
const responseBody = await new Promise((resolve) => {
const buffers = [];
const buffers: any[] = [];
resultReader.on("data", (chunk) => buffers.push(chunk));
resultReader.on("close", () => {
resolve(Buffer.concat(buffers).toString("utf8"));
Expand Down
34 changes: 23 additions & 11 deletions packages/node-http-handler/src/node-http2-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,27 @@ export class NodeHttp2Handler implements HttpHandler {
}
}
const { requestTimeout, disableConcurrentStreams } = this.config;
return new Promise((resolve, rejectOriginal) => {
return new Promise((_resolve, _reject) => {
// It's redundant to track fulfilled because promises use the first resolution/rejection
// but avoids generating unnecessary stack traces in the "close" event handler.
let fulfilled = false;

let writeRequestBodyPromise: Promise<void> | undefined = undefined;
const resolve = async (arg: { response: HttpResponse }) => {
await writeRequestBodyPromise;
_resolve(arg);
};
const reject = async (arg: unknown) => {
await writeRequestBodyPromise;
_reject(arg);
};

// if the request was already aborted, prevent doing extra work
if (abortSignal?.aborted) {
fulfilled = true;
const abortError = new Error("Request aborted");
abortError.name = "AbortError";
rejectOriginal(abortError);
reject(abortError);
return;
}

Expand All @@ -98,12 +108,12 @@ export class NodeHttp2Handler implements HttpHandler {
disableConcurrentStreams: disableConcurrentStreams || false,
} as ConnectConfiguration);

const reject = (err: Error) => {
const rejectWithDestroy = (err: Error) => {
if (disableConcurrentStreams) {
this.destroySession(session);
}
fulfilled = true;
rejectOriginal(err);
reject(err);
};

const queryString = buildQueryString(query || {});
Expand Down Expand Up @@ -138,7 +148,7 @@ export class NodeHttp2Handler implements HttpHandler {
req.close();
const timeoutError = new Error(`Stream timed out because of no activity for ${requestTimeout} ms`);
timeoutError.name = "TimeoutError";
reject(timeoutError);
rejectWithDestroy(timeoutError);
});
}

Expand All @@ -147,17 +157,19 @@ export class NodeHttp2Handler implements HttpHandler {
req.close();
const abortError = new Error("Request aborted");
abortError.name = "AbortError";
reject(abortError);
rejectWithDestroy(abortError);
};
}

// Set up handlers for errors
req.on("frameError", (type: number, code: number, id: number) => {
reject(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`));
rejectWithDestroy(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`));
});
req.on("error", reject);
req.on("error", rejectWithDestroy);
req.on("aborted", () => {
reject(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`));
rejectWithDestroy(
new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`)
);
});

// The HTTP/2 error code used when closing the stream can be retrieved using the
Expand All @@ -169,11 +181,11 @@ export class NodeHttp2Handler implements HttpHandler {
session.destroy();
}
if (!fulfilled) {
reject(new Error("Unexpected error: http2 request did not get a response"));
rejectWithDestroy(new Error("Unexpected error: http2 request did not get a response"));
}
});

writeRequestBody(req, request);
writeRequestBodyPromise = writeRequestBody(req, request, requestTimeout);
});
}

Expand Down
36 changes: 29 additions & 7 deletions packages/node-http-handler/src/write-request-body.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,37 @@ import { ClientRequest } from "http";
import { ClientHttp2Stream } from "http2";
import { Readable } from "stream";

export function writeRequestBody(httpRequest: ClientRequest | ClientHttp2Stream, request: HttpRequest) {
const expect = request.headers["Expect"] || request.headers["expect"];
const MIN_WAIT_TIME = 1000;

/**
* This resolves when writeBody has been called.
*
* @param httpRequest - opened Node.js request.
* @param request - container with the request body.
* @param maxContinueTimeoutMs - maximum time to wait for the continue event. Minimum of 1000ms.
*/
export async function writeRequestBody(
httpRequest: ClientRequest | ClientHttp2Stream,
request: HttpRequest,
maxContinueTimeoutMs = MIN_WAIT_TIME
): Promise<void> {
const headers = request.headers ?? {};
const expect = headers["Expect"] || headers["expect"];

if (expect === "100-continue") {
httpRequest.on("continue", () => {
writeBody(httpRequest, request.body);
});
} else {
writeBody(httpRequest, request.body);
await Promise.race<void>([
new Promise((resolve) => {
setTimeout(resolve, Math.max(MIN_WAIT_TIME, maxContinueTimeoutMs));
}),
new Promise((resolve) => {
httpRequest.on("continue", () => {
resolve();
});
}),
]);
}

writeBody(httpRequest, request.body);
}

function writeBody(
Expand Down

0 comments on commit 14810d3

Please sign in to comment.