Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronously update internal sockets length so http.Agent pooling is used #300

Merged
merged 8 commits into from
Mar 30, 2024
Merged
5 changes: 5 additions & 0 deletions .changeset/seven-colts-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agent-base": patch
---

Synchronously update internal sockets length so `http.Agent` pooling is used
85 changes: 77 additions & 8 deletions packages/agent-base/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as net from 'net';
import * as tls from 'tls';
import * as http from 'http';
import { Agent as HttpsAgent } from 'https';
import type { Duplex } from 'stream';

export * from './helpers';
Expand Down Expand Up @@ -77,6 +78,65 @@ export abstract class Agent extends http.Agent {
);
}

// In order to support async signatures in `connect()` and Node's native
// connection pooling in `http.Agent`, the array of sockets for each origin
// has to be updated synchronously. This is so the length of the array is
// accurate when `addRequest()` is next called. We achieve this by creating a
// fake socket and adding it to `sockets[origin]` and incrementing
// `totalSocketCount`.
private incrementSockets(name: string) {
// If `maxSockets` and `maxTotalSockets` are both Infinity then there is no
// need to create a fake socket because Node.js native connection pooling
// will never be invoked.
if (this.maxSockets === Infinity && this.maxTotalSockets === Infinity) {
return null;
}
// All instances of `sockets` are expected TypeScript errors. The
// alternative is to add it as a private property of this class but that
// will break TypeScript subclassing.
if (!this.sockets[name]) {
// @ts-expect-error `sockets` is readonly in `@types/node`
this.sockets[name] = [];
}
const fakeSocket = new net.Socket({ writable: false });
(this.sockets[name] as net.Socket[]).push(fakeSocket);
// @ts-expect-error `totalSocketCount` isn't defined in `@types/node`
this.totalSocketCount++;
return fakeSocket;
}

private decrementSockets(name: string, socket: null | net.Socket) {
if (!this.sockets[name] || socket === null) {
return;
}
const sockets = this.sockets[name] as net.Socket[];
const index = sockets.indexOf(socket);
if (index !== -1) {
sockets.splice(index, 1);
// @ts-expect-error `totalSocketCount` isn't defined in `@types/node`
this.totalSocketCount--;
if (sockets.length === 0) {
// @ts-expect-error `sockets` is readonly in `@types/node`
delete this.sockets[name];
}
}
}

// In order to properly update the socket pool, we need to call `getName()` on
// the core `https.Agent` if it is a secureEndpoint.
getName(options: AgentConnectOpts): string {
const secureEndpoint =
typeof options.secureEndpoint === 'boolean'
? options.secureEndpoint
: this.isSecureEndpoint(options);
if (secureEndpoint) {
// @ts-expect-error `getName()` isn't defined in `@types/node`
return HttpsAgent.prototype.getName.call(this, options);
}
// @ts-expect-error `getName()` isn't defined in `@types/node`
return super.getName(options);
}

createSocket(
req: http.ClientRequest,
options: AgentConnectOpts,
Expand All @@ -86,17 +146,26 @@ export abstract class Agent extends http.Agent {
...options,
secureEndpoint: this.isSecureEndpoint(options),
};
const name = this.getName(connectOpts);
const fakeSocket = this.incrementSockets(name);
Promise.resolve()
.then(() => this.connect(req, connectOpts))
.then((socket) => {
if (socket instanceof http.Agent) {
// @ts-expect-error `addRequest()` isn't defined in `@types/node`
return socket.addRequest(req, connectOpts);
.then(
(socket) => {
this.decrementSockets(name, fakeSocket);
if (socket instanceof http.Agent) {
// @ts-expect-error `addRequest()` isn't defined in `@types/node`
return socket.addRequest(req, connectOpts);
}
this[INTERNAL].currentSocket = socket;
// @ts-expect-error `createSocket()` isn't defined in `@types/node`
super.createSocket(req, options, cb);
},
(err) => {
this.decrementSockets(name, fakeSocket);
cb(err);
}
this[INTERNAL].currentSocket = socket;
// @ts-expect-error `createSocket()` isn't defined in `@types/node`
super.createSocket(req, options, cb);
}, cb);
);
}

createConnection(): Duplex {
Expand Down
117 changes: 117 additions & 0 deletions packages/agent-base/test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ describe('Agent (TypeScript)', () => {
) {
gotCallback = true;
assert(opts.secureEndpoint === false);
assert.equal(this.getName(opts), `127.0.0.1:${port}:`);
return net.connect(opts);
}
}
Expand Down Expand Up @@ -308,6 +309,60 @@ describe('Agent (TypeScript)', () => {
server2.close();
}
});

it('should support `keepAlive: true` with `maxSockets`', async () => {
let reqCount = 0;
let connectCount = 0;

class MyAgent extends Agent {
async connect(
_req: http.ClientRequest,
opts: AgentConnectOpts
) {
connectCount++;
assert(opts.secureEndpoint === false);
await sleep(10);
return net.connect(opts);
}
}
const agent = new MyAgent({ keepAlive: true, maxSockets: 1 });

const server = http.createServer(async (req, res) => {
expect(req.headers.connection).toEqual('keep-alive');
reqCount++;
await sleep(10);
res.end();
});
const addr = await listen(server);

try {
const resPromise = req(new URL('/foo', addr), { agent });
const res2Promise = req(new URL('/another', addr), {
agent,
});

const res = await resPromise;
expect(reqCount).toEqual(1);
expect(connectCount).toEqual(1);
expect(res.headers.connection).toEqual('keep-alive');

res.resume();
const s1 = res.socket;
await once(s1, 'free');

const res2 = await res2Promise;
expect(reqCount).toEqual(2);
expect(connectCount).toEqual(1);
expect(res2.headers.connection).toEqual('keep-alive');
assert(res2.socket === s1);

res2.resume();
await once(res2.socket, 'free');
} finally {
agent.destroy();
server.close();
}
});
});

describe('"https" module', () => {
Expand All @@ -322,6 +377,10 @@ describe('Agent (TypeScript)', () => {
): net.Socket {
gotCallback = true;
assert(opts.secureEndpoint === true);
assert.equal(
this.getName(opts),
`127.0.0.1:${port}::::::::false:::::::::::::`
);
return tls.connect(opts);
}
}
Expand Down Expand Up @@ -509,5 +568,63 @@ describe('Agent (TypeScript)', () => {
server.close();
}
});

it('should support `keepAlive: true` with `maxSockets`', async () => {
let reqCount = 0;
let connectCount = 0;

class MyAgent extends Agent {
async connect(
_req: http.ClientRequest,
opts: AgentConnectOpts
) {
connectCount++;
assert(opts.secureEndpoint === true);
await sleep(10);
return tls.connect(opts);
}
}
const agent = new MyAgent({ keepAlive: true, maxSockets: 1 });

const server = https.createServer(sslOptions, async (req, res) => {
expect(req.headers.connection).toEqual('keep-alive');
reqCount++;
await sleep(10);
res.end();
});
const addr = await listen(server);

try {
const resPromise = req(new URL('/foo', addr), {
agent,
rejectUnauthorized: false,
});
const res2Promise = req(new URL('/another', addr), {
agent,
rejectUnauthorized: false,
});

const res = await resPromise;
expect(reqCount).toEqual(1);
expect(connectCount).toEqual(1);
expect(res.headers.connection).toEqual('keep-alive');

res.resume();
const s1 = res.socket;
await once(s1, 'free');

const res2 = await res2Promise;
expect(reqCount).toEqual(2);
expect(connectCount).toEqual(1);
expect(res2.headers.connection).toEqual('keep-alive');
assert(res2.socket === s1);

res2.resume();
await once(res2.socket, 'free');
} finally {
agent.destroy();
server.close();
}
});
});
});