Skip to content

Commit

Permalink
node-transport+ws-transport: rewrite connectTimeout logic
Browse files Browse the repository at this point in the history
connect() function no longer accepts AbortSignal because it has
confusing semantics when combined with reopen logic.
  • Loading branch information
yoursunny committed Feb 4, 2024
1 parent 31d9d20 commit 60171d5
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 92 deletions.
1 change: 1 addition & 0 deletions pkg/node-transport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"dependencies": {
"@ndn/l3face": "workspace:*",
"event-iterator": "^2.0.0",
"p-event": "^6.0.0",
"tslib": "^2.6.2",
"type-fest": "^4.10.2",
"url-format-lax": "^2.0.0",
Expand Down
17 changes: 17 additions & 0 deletions pkg/node-transport/src/impl-net-connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import net from "node:net";

import { pEvent } from "p-event";

export async function connectAndWaitConnected(
opts: net.NetConnectOpts & { connectTimeout?: number },
): Promise<net.Socket> {
const sock = net.connect(opts);
try {
await pEvent(sock, "connect", { timeout: opts.connectTimeout ?? 10000 });
} catch (err: unknown) {
sock.destroy();
throw err;
}
sock.on("error", () => undefined);
return sock;
}
43 changes: 9 additions & 34 deletions pkg/node-transport/src/tcp-transport.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,43 @@
import net from "node:net";
import type { Socket, TcpSocketConnectOpts } from "node:net";

import { L3Face, StreamTransport } from "@ndn/l3face";
import type { SetOptional } from "type-fest";

import { joinHostPort } from "./hostport";
import { connectAndWaitConnected } from "./impl-net-connect";

/** TCP socket transport. */
export class TcpTransport extends StreamTransport<net.Socket> {
export class TcpTransport extends StreamTransport<Socket> {
/**
* Create a transport and connect to remote endpoint.
* @param host - Remote host (default is localhost) or endpoint address (with other options).
* @param port - Remote port. Default is 6363.
* @param opts - Other options.
* @see {@link TcpTransport.createFace}
*/
public static connect(
host?: string | (SetOptional<net.TcpSocketConnectOpts, "port"> & TcpTransport.Options),
public static async connect(
host?: string | (SetOptional<TcpSocketConnectOpts, "port"> & TcpTransport.Options),
port = 6363,
opts: TcpTransport.Options = {},
): Promise<TcpTransport> {
const combined: net.TcpSocketConnectOpts & TcpTransport.Options = {
const combined: TcpSocketConnectOpts & TcpTransport.Options = {
port,
noDelay: true,
...(typeof host === "string" ? { host } : host),
...opts,
};
const { connectTimeout = 10000, signal } = combined;

return new Promise<TcpTransport>((resolve, reject) => {
const sock = net.connect(combined);

let timeout: NodeJS.Timeout | undefined; // eslint-disable-line prefer-const
const fail = (err?: Error) => {
clearTimeout(timeout);
sock.destroy();
reject(err);
};
timeout = setTimeout(() => fail(new Error("connectTimeout")), connectTimeout);

const onabort = () => fail(new Error("abort"));
signal?.addEventListener("abort", onabort);

sock.on("error", () => undefined);
sock.once("error", fail);
sock.once("connect", () => {
clearTimeout(timeout);
sock.off("error", fail);
signal?.removeEventListener("abort", onabort);
resolve(new TcpTransport(sock, combined));
});
});
return new TcpTransport(await connectAndWaitConnected(combined), combined);
}

private constructor(sock: net.Socket, private readonly connectOpts: net.TcpSocketConnectOpts) {
private constructor(sock: Socket, private readonly connectOpts: TcpSocketConnectOpts) {
super(sock, {
describe: `TCP(${joinHostPort(sock.remoteAddress!, sock.remotePort!)})`,
local: sock.localAddress === sock.remoteAddress,
});
}

/** Reopen the transport by connecting again with the same options. */
/** Reopen the transport by making a new connection with the same options. */
public override reopen(): Promise<TcpTransport> {
return TcpTransport.connect(this.connectOpts);
}
Expand All @@ -73,9 +51,6 @@ export namespace TcpTransport {
* @defaultValue 10000
*/
connectTimeout?: number;

/** AbortSignal that allows canceling connection attempt via AbortController. */
signal?: AbortSignal;
}

/** Create a transport and add to forwarder. */
Expand Down
29 changes: 13 additions & 16 deletions pkg/node-transport/src/unix-transport.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
import net from "node:net";
import type { IpcSocketConnectOpts, Socket } from "node:net";

import { L3Face, StreamTransport } from "@ndn/l3face";

import { connectAndWaitConnected } from "./impl-net-connect";

/** Unix socket transport. */
export class UnixTransport extends StreamTransport<net.Socket> {
export class UnixTransport extends StreamTransport<Socket> {
/**
* Create a transport and connect to remote endpoint.
* @param path - Unix socket path or IPC options.
* @see {@link UdpTransport.createFace}
* @see {@link UnixTransport.createFace}
*/
public static connect(path: string | net.IpcSocketConnectOpts): Promise<UnixTransport> {
const connectOpts: net.IpcNetConnectOpts = typeof path === "string" ? { path } : path;
return new Promise<UnixTransport>((resolve, reject) => {
const sock = net.connect(connectOpts);
sock.on("error", () => undefined);
sock.once("error", reject);
sock.once("connect", () => {
sock.off("error", reject);
resolve(new UnixTransport(sock, connectOpts));
});
});
public static async connect(path: string | IpcSocketConnectOpts): Promise<UnixTransport> {
const connectOpts: IpcSocketConnectOpts = typeof path === "string" ? { path } : path;
return new UnixTransport(
await connectAndWaitConnected(connectOpts),
connectOpts,
);
}

private constructor(sock: net.Socket, private readonly connectOpts: net.IpcNetConnectOpts) {
private constructor(sock: Socket, private readonly connectOpts: IpcSocketConnectOpts) {
super(sock, {
describe: `Unix(${connectOpts.path})`,
local: true,
});
}

/** Reopen the transport by connecting again with the same options. */
/** Reopen the transport by making a new connection with the same options. */
public override reopen(): Promise<UnixTransport> {
return UnixTransport.connect(this.connectOpts);
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ws-transport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
"@ndn/util": "workspace:*",
"@types/ws": "^8.5.10",
"event-iterator": "^2.0.0",
"p-event": "^6.0.0",
"tslib": "^2.6.2",
"ws": "^8.16.0"
},
"devDependencies": {
"@ndn/node-transport": "workspace:*",
"it-pushable": "^3.2.3",
"p-event": "^6.0.0"
"it-pushable": "^3.2.3"
},
"optionalDependencies": {
"bufferutil": "^4.0.8"
Expand Down
57 changes: 17 additions & 40 deletions pkg/ws-transport/src/ws-transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { L3Face, rxFromPacketIterable, Transport } from "@ndn/l3face";
import EventIterator from "event-iterator";
import { pEvent } from "p-event";
import type { WebSocket as WsWebSocket } from "ws";

import { changeBinaryType, extractMessage, makeWebSocket } from "./ws_node";
Expand All @@ -11,39 +12,20 @@ export class WsTransport extends Transport {
* @param uri - Server URI or existing WebSocket instance.
* @see {@link WsTransport.createFace}
*/
public static connect(uri: string | WebSocket | WsWebSocket, opts: WsTransport.Options = {}): Promise<WsTransport> {
const { connectTimeout = 10000, signal } = opts;
return new Promise<WsTransport>((resolve, reject) => {
const sock = typeof uri === "string" ? makeWebSocket(uri) : uri as unknown as WebSocket;
if (sock.readyState === sock.OPEN) {
resolve(new WsTransport(sock, opts));
return;
public static async connect(
uri: string | WebSocket | WsWebSocket,
opts: WsTransport.Options = {},
): Promise<WsTransport> {
const sock = typeof uri === "string" ? makeWebSocket(uri) : uri as unknown as WebSocket;
if (sock.readyState !== sock.OPEN) {
try {
await pEvent(sock, "open", { timeout: opts.connectTimeout ?? 10000 });
} catch (err: unknown) {
sock.close(1002);
throw err;
}

let timeout: NodeJS.Timeout | undefined; // eslint-disable-line prefer-const
const fail = (err?: Error) => {
clearTimeout(timeout);
sock.close();
reject(err);
};
timeout = setTimeout(() => fail(new Error("connectTimeout")), connectTimeout);

const onabort = () => fail(new Error("abort"));
signal?.addEventListener("abort", onabort);

const onerror = (evt: Event) => {
sock.close();
reject(evt.type === "error" ? (evt as ErrorEvent).error : new Error(`${evt}`));
};
sock.addEventListener("error", onerror, { once: true });

sock.addEventListener("open", () => {
clearTimeout(timeout);
sock.removeEventListener("error", onerror);
signal?.removeEventListener("abort", onabort);
resolve(new WsTransport(sock, opts));
});
});
}
return new WsTransport(sock, opts);
}

private constructor(private readonly sock: WebSocket, private readonly opts: WsTransport.Options) {
Expand Down Expand Up @@ -105,7 +87,7 @@ export class WsTransport extends Transport {
}

public close(): void {
this.sock.close();
this.sock.close(1000);
}

/** Reopen the transport by connecting again with the same options. */
Expand All @@ -123,20 +105,15 @@ export namespace WsTransport {
*/
connectTimeout?: number;

/** AbortSignal that allows canceling connection attempt via AbortController. */
signal?: AbortSignal;

/**
* Buffer amount (in bytes) to start TX throttling.
* @defaultValue
* 1 MiB
* @defaultValue 1 MiB
*/
highWaterMark?: number;

/**
* Buffer amount (in bytes) to stop TX throttling.
* @defaultValue
* 16 KiB
* @defaultValue 16 KiB
*/
lowWaterMark?: number;
}
Expand Down

0 comments on commit 60171d5

Please sign in to comment.