Skip to content

Commit

Permalink
feat: allow force stopping a QUICSocket
Browse files Browse the repository at this point in the history
This will allow easy cleanup after tests to ensure the process doesn't hold open. Everything else can be handled via garbage collection.

I removed the register client method, It's not needed since we check against the connection map before stopping.

* Related #14

[ci skip]
  • Loading branch information
tegefaulkes authored and CMCDragonkai committed May 17, 2023
1 parent 5fcdba4 commit 9c16b5a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
1 change: 0 additions & 1 deletion src/QUICClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ class QUICClient extends EventTarget {
this.socket = socket;
this.isSocketShared = isSocketShared;
// Registers itself to the socket
this.socket.registerClient(this);
if (!isSocketShared) {
this.socket.addEventListener('error', this.handleQUICSocketError);
}
Expand Down
48 changes: 16 additions & 32 deletions src/QUICSocket.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type QUICClient from './QUICClient';
import type QUICServer from './QUICServer';
import type QUICConnection from './QUICConnection';
import type { Crypto, Host, Hostname, Port } from './types';
Expand Down Expand Up @@ -50,12 +49,12 @@ class QUICSocket extends EventTarget {
* If it is non-QUIC, we can discard the data.
* If there are multiple coalesced QUIC packets, it is expected that
* all packets are intended for the same connection. This means we only
* need to parse the first QUIC packet to determinine what connection to route
* need to parse the first QUIC packet to determining what connection to route
* the data to.
*/
protected handleSocketMessage = async (
data: Buffer,
rinfo: dgram.RemoteInfo,
remoteInfo: dgram.RemoteInfo,
) => {
// The data buffer may have multiple coalesced QUIC packets.
// This header is parsed from the first packet.
Expand Down Expand Up @@ -92,9 +91,9 @@ class QUICSocket extends EventTarget {
quiche.MAX_CONN_ID_LEN,
);

const remoteInfo = {
host: rinfo.address as Host,
port: rinfo.port as Port,
const remoteInfo_ = {
host: remoteInfo.address as Host,
port: remoteInfo.port as Port,
};

// Now both must be checked
Expand All @@ -107,7 +106,7 @@ class QUICSocket extends EventTarget {
}
const conn_ = await this.server.connectionNew(
data,
remoteInfo,
remoteInfo_,
header,
dcid,
scid,
Expand All @@ -126,7 +125,7 @@ class QUICSocket extends EventTarget {
// When we register a client, we have to put the connection in our
// connection map
}
await conn.recv(data, remoteInfo);
await conn.recv(data, remoteInfo_);

// The `conn.recv` now may actually destroy the connection
// In that sense, there's nothing to send
Expand Down Expand Up @@ -273,10 +272,16 @@ class QUICSocket extends EventTarget {
this.logger.info(`Started ${this.constructor.name} on ${address}`);
}

public async stop(): Promise<void> {
/**
* Will stop the socket.
* An `ErrorQUICSocketConnectionsActive` will be thrown if there are active connections.
* If force is true, it will skip checking connections and stop the socket.
* @param force - Will force the socket to end even if there are active connections, used for cleaning up after tests.
*/
public async stop(force = false): Promise<void> {
const address = utils.buildAddress(this._host, this._port);
this.logger.info(`Stop ${this.constructor.name} on ${address}`);
if (this.connectionMap.size > 0) {
if (!force && this.connectionMap.size > 0) {
throw new errors.ErrorQUICSocketConnectionsActive(
`Cannot stop QUICSocket with ${this.connectionMap.size} active connection(s)`,
);
Expand Down Expand Up @@ -354,27 +359,6 @@ class QUICSocket extends EventTarget {
return this.socketSend(...params);
}

/**
* Registers a client to the socket
* This is a new client, but clients don't die by itself?
*/
public registerClient(client: QUICClient) {
// So what really this does?
// Is this about creating a connection?
// So we can add the connection to the map?
// And if we are doing
// QUICConnection.createQUICConnection
// Then that means, we are really creating that connection in the async creator
// That means the async creator needs to create teh `connection` and call it too
this.logger.error('registerClient IS NOT IMPLEMENTED!');
}

// But we already have a connection map
// well yea, we are checking liveness of connections
// But client destruction is only way to destory connections
// But if the client connection fails
// we need to simultaneously destroy the client

/**
* Sets a single server to the socket
* You can only have 1 server for the socket
Expand All @@ -388,7 +372,7 @@ class QUICSocket extends EventTarget {
* Just go straight to calling a thing
* We can call this.server.handleConnection()
* Why `handleConnection` because technically it's built on top of the handleMessage
* Thatbecomes the key idea there
* That becomes the key idea there
* handleNewConnection
* And all sorts of other stuff!
* Or whatever it needs to be
Expand Down
52 changes: 45 additions & 7 deletions tests/QUICSocket.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Crypto, Host, Hostname, Port } from '@/types';
import type { Crypto, Host } from '@/types';
import type QUICConnection from '@/QUICConnection';
import dgram from 'dgram';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import QUICSocket from '@/QUICSocket';
import * as utils from '@/utils';
import * as errors from '@/errors';
import QUICConnectionId from '@/QUICConnectionId';
import * as testsUtils from './utils';

describe(QUICSocket.name, () => {
Expand All @@ -19,7 +21,7 @@ describe(QUICSocket.name, () => {
let ipv6Socket: dgram.Socket;
let dualStackSocket: dgram.Socket;
let ipv4SocketBind: (port: number, host: string) => Promise<void>;
let ipv4SocketSend: (...params: Array<any>) => Promise<number>;
let _ipv4SocketSend: (...params: Array<any>) => Promise<number>;
let ipv4SocketClose: () => Promise<void>;
let ipv4SocketPort: number;
// Handle IPv4 messages
Expand All @@ -32,7 +34,7 @@ describe(QUICSocket.name, () => {
ipv4SocketMessageResolveP = resolveP;
};
let ipv6SocketBind: (port: number, host: string) => Promise<void>;
let ipv6SocketSend: (...params: Array<any>) => Promise<number>;
let _ipv6SocketSend: (...params: Array<any>) => Promise<number>;
let ipv6SocketClose: () => Promise<void>;
let ipv6SocketPort: number;
// Handle IPv6 messages
Expand All @@ -45,7 +47,7 @@ describe(QUICSocket.name, () => {
ipv6SocketMessageResolveP = resolveP;
};
let dualStackSocketBind: (port: number, host: string) => Promise<void>;
let dualStackSocketSend: (...params: Array<any>) => Promise<number>;
let _dualStackSocketSend: (...params: Array<any>) => Promise<number>;
let dualStackSocketClose: () => Promise<void>;
let dualStackSocketPort: number;
// Handle dual stack messages
Expand Down Expand Up @@ -81,15 +83,15 @@ describe(QUICSocket.name, () => {
ipv6Only: false,
});
ipv4SocketBind = utils.promisify(ipv4Socket.bind).bind(ipv4Socket);
ipv4SocketSend = utils.promisify(ipv4Socket.send).bind(ipv4Socket);
_ipv4SocketSend = utils.promisify(ipv4Socket.send).bind(ipv4Socket);
ipv4SocketClose = utils.promisify(ipv4Socket.close).bind(ipv4Socket);
ipv6SocketBind = utils.promisify(ipv6Socket.bind).bind(ipv6Socket);
ipv6SocketSend = utils.promisify(ipv6Socket.send).bind(ipv6Socket);
_ipv6SocketSend = utils.promisify(ipv6Socket.send).bind(ipv6Socket);
ipv6SocketClose = utils.promisify(ipv6Socket.close).bind(ipv6Socket);
dualStackSocketBind = utils
.promisify(dualStackSocket.bind)
.bind(dualStackSocket);
dualStackSocketSend = utils
_dualStackSocketSend = utils
.promisify(dualStackSocket.send)
.bind(dualStackSocket);
dualStackSocketClose = utils
Expand Down Expand Up @@ -506,4 +508,40 @@ describe(QUICSocket.name, () => {
]);
});
});
test('socket should throw if stopped with active connections', async () => {
const socket = new QUICSocket({
crypto,
logger,
});
await socket.start({
host: '127.0.0.1' as Host,
});
const connectionId = QUICConnectionId.fromBuffer(
Buffer.from('SomeRandomId'),
);
socket.connectionMap.set(connectionId, {
type: 'client',
} as QUICConnection);
await expect(socket.stop()).rejects.toThrow(
errors.ErrorQUICSocketConnectionsActive,
);
socket.connectionMap.delete(connectionId);
await expect(socket.stop()).toResolve();
});
test('socket should stop when forced with active connections', async () => {
const socket = new QUICSocket({
crypto,
logger,
});
await socket.start({
host: '127.0.0.1' as Host,
});
const connectionId = QUICConnectionId.fromBuffer(
Buffer.from('SomeRandomId'),
);
socket.connectionMap.set(connectionId, {
type: 'client',
} as QUICConnection);
await expect(socket.stop(true)).toResolve();
});
});

0 comments on commit 9c16b5a

Please sign in to comment.