From c220dfda24b49584b76d3ee5760032c7a5a601c1 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 10 May 2023 14:27:41 +1000 Subject: [PATCH] feat: connections and streams now include a metadata getter for the connection info * This is to match the stream interface for Polykey's agnostic RPC streams. * Also including a cancel method to match that interface * Fixes #16 [ci skip] --- src/QUICConnection.ts | 16 +++ src/QUICSocket.ts | 4 +- src/QUICStream.ts | 31 ++++++ src/errors.ts | 5 + src/types.ts | 9 ++ src/utils.ts | 20 ++++ tests/QUICStream.test.ts | 222 +++++++++++++++++++++++++++++++++++---- tests/tlsUtils.ts | 70 +++++++----- 8 files changed, 323 insertions(+), 54 deletions(-) diff --git a/src/QUICConnection.ts b/src/QUICConnection.ts index a45d7f72..93e53e5a 100644 --- a/src/QUICConnection.ts +++ b/src/QUICConnection.ts @@ -6,6 +6,7 @@ import type { QUICConfig } from './config'; import type { Host, Port, RemoteInfo, StreamId } from './types'; import type { Connection, ConnectionErrorCode, SendInfo } from './native/types'; import type { StreamCodeToReason, StreamReasonToCode } from './types'; +import type { ConnectionMetadata } from './types'; import { CreateDestroy, ready, @@ -299,6 +300,21 @@ class QUICConnection extends EventTarget { return this.socket.port; } + public get remoteInfo(): ConnectionMetadata { + const derCerts = this.conn.peerCertChain(); + const remoteCertificates = + derCerts != null + ? derCerts.map((der) => utils.certificateDERToPEM(der)) + : null; + return { + remoteCertificates, + localHost: this.localHost, + localPort: this.localPort, + remoteHost: this.remoteHost, + remotePort: this.remotePort, + }; + } + /** * This provides the ability to destroy with a specific error. This will wait for the connection to fully drain. */ diff --git a/src/QUICSocket.ts b/src/QUICSocket.ts index c1cc69a7..6a9b84e2 100644 --- a/src/QUICSocket.ts +++ b/src/QUICSocket.ts @@ -138,9 +138,7 @@ class QUICSocket extends EventTarget { // Each send/recv/timeout may result in a destruction if (!conn[destroyed]) { // Ignore any errors, concurrent with destruction - await conn.send().catch((e) => { - this.logger.error(`not destroyed send ${e.message}`); - }); + await conn.send().catch(() => {}); } }; diff --git a/src/QUICStream.ts b/src/QUICStream.ts index 8971624a..7e47693b 100644 --- a/src/QUICStream.ts +++ b/src/QUICStream.ts @@ -4,6 +4,7 @@ import type { StreamId, StreamReasonToCode, StreamCodeToReason, + ConnectionMetadata, } from './types'; import type { Connection } from './native/types'; import { ReadableStream, WritableStream } from 'stream/web'; @@ -203,6 +204,21 @@ class QUICStream return this._recvPaused; } + /** + * Connection information including hosts, ports and cert data. + */ + public get remoteInfo(): ConnectionMetadata { + return this.connection.remoteInfo; + } + + /** + * Duplicating `remoteInfo` functionality. + * This strictly exists to work with agnostic RPC stream interface. + */ + public get meta(): ConnectionMetadata { + return this.connection.remoteInfo; + } + /** * This method can be arrived top-down or bottom-up: * @@ -236,6 +252,21 @@ class QUICStream this.logger.info(`Destroyed ${this.constructor.name}`); } + /** + * Used to cancel the streams. This function is synchronous but triggers some asynchronous events. + */ + public cancel(reason?: any): void { + reason = reason ?? new errors.ErrorQUICStreamCancel(); + if (!this._recvClosed) { + this.readableController.error(reason); + void this.closeRecv(true, reason); + } + if (!this._sendClosed) { + this.writableController.error(reason); + void this.closeSend(true, reason); + } + } + /** * External push is converted to internal pull * Internal system decides when to unblock diff --git a/src/errors.ts b/src/errors.ts index ba2a1c3e..8917183a 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -91,6 +91,10 @@ class ErrorQUICStreamClose extends ErrorQUICStream { static description = 'QUIC Stream force close'; } +class ErrorQUICStreamCancel extends ErrorQUICStream { + static description = 'QUIC Stream was cancelled without a provided reason'; +} + class ErrorQUICStreamUnexpectedClose extends ErrorQUICStream { static description = 'QUIC Stream closed early with no reason given'; } @@ -122,6 +126,7 @@ export { ErrorQUICStreamDestroyed, ErrorQUICStreamLocked, ErrorQUICStreamClose, + ErrorQUICStreamCancel, ErrorQUICStreamUnexpectedClose, ErrorQUICUndefinedBehaviour, }; diff --git a/src/types.ts b/src/types.ts index 4d6d5bf9..75dd4303 100644 --- a/src/types.ts +++ b/src/types.ts @@ -88,6 +88,14 @@ type StreamCodeToReason = ( code: number, ) => any | PromiseLike; +type ConnectionMetadata = { + remoteCertificates: Array | null; + localHost: Host; + localPort: Port; + remoteHost: Host; + remotePort: Port; +}; + export type { Opaque, Callback, @@ -104,4 +112,5 @@ export type { RemoteInfo, StreamReasonToCode, StreamCodeToReason, + ConnectionMetadata, }; diff --git a/src/utils.ts b/src/utils.ts index 78defcd0..e447f2ab 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -251,6 +251,24 @@ function never(): never { throw new errors.ErrorQUICUndefinedBehaviour(); } +function certificateDERToPEM(der: Uint8Array): string { + const data = Buffer.from(der); + const contents = + data + .toString('base64') + .replace(/(.{64})/g, '$1\n') + .trimEnd() + '\n'; + return `-----BEGIN CERTIFICATE-----\n${contents}-----END CERTIFICATE-----\n`; +} + +function certificatePEMsToCertChainPem(pems: Array): string { + let certChainPEM = ''; + for (const pem of pems) { + certChainPEM += pem; + } + return certChainPEM; +} + export { isIPv4, isIPv6, @@ -268,4 +286,6 @@ export { encodeConnectionId, decodeConnectionId, never, + certificateDERToPEM, + certificatePEMsToCertChainPem, }; diff --git a/tests/QUICStream.test.ts b/tests/QUICStream.test.ts index 9624821d..c63f6d47 100644 --- a/tests/QUICStream.test.ts +++ b/tests/QUICStream.test.ts @@ -2,13 +2,13 @@ import type * as events from '@/events'; import type { Crypto, Host, Port } from '@'; import { testProp, fc } from '@fast-check/jest'; import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger'; -import { promise } from '@/utils'; +import { destroyed } from '@matrixai/async-init'; +import * as utils from '@/utils'; import QUICServer from '@/QUICServer'; import QUICClient from '@/QUICClient'; import QUICStream from '@/QUICStream'; -import { tlsConfigWithCaArb } from './tlsUtils'; +import { tlsConfigWithCaArb, tlsConfigWithCaGENOKPArb } from './tlsUtils'; import * as testsUtils from './utils'; -import { sleep } from './utils'; describe(QUICStream.name, () => { const logger = new Logger(`${QUICStream.name} Test`, LogLevel.WARN, [ @@ -37,7 +37,8 @@ describe(QUICStream.name, () => { 'should create streams', [tlsConfigWithCaArb, fc.integer({ min: 5, max: 50 }).noShrink()], async (tlsConfigProm, streamsNum) => { - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -71,7 +72,7 @@ describe(QUICStream.name, () => { const conn = (await connectionEventProm.p).detail; // Do the test let streamCount = 0; - const streamCreationProm = promise(); + const streamCreationProm = utils.promise(); conn.addEventListener('stream', () => { streamCount += 1; if (streamCount >= streamsNum) streamCreationProm.resolveP(); @@ -87,7 +88,7 @@ describe(QUICStream.name, () => { } await Promise.race([ streamCreationProm.p, - sleep(500).then(() => { + testsUtils.sleep(500).then(() => { throw Error('Creation timed out'); }), ]); @@ -107,7 +108,8 @@ describe(QUICStream.name, () => { fc.uint8Array({ minLength: 1 }).noShrink(), ], async (tlsConfigProm, streamsNum, message) => { - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -143,8 +145,8 @@ describe(QUICStream.name, () => { // Do the test let streamCreatedCount = 0; let streamEndedCount = 0; - const streamCreationProm = promise(); - const streamEndedProm = promise(); + const streamCreationProm = utils.promise(); + const streamEndedProm = utils.promise(); conn.addEventListener( 'stream', (asd: events.QUICConnectionStreamEvent) => { @@ -171,7 +173,7 @@ describe(QUICStream.name, () => { } await Promise.race([ streamCreationProm.p, - sleep(100).then(() => { + testsUtils.sleep(100).then(() => { throw Error('Creation timed out'); }), ]); @@ -179,7 +181,7 @@ describe(QUICStream.name, () => { await Promise.allSettled(streams.map((stream) => stream.destroy())); await Promise.race([ streamEndedProm.p, - sleep(100).then(() => { + testsUtils.sleep(100).then(() => { throw Error('Ending timed out'); }), ]); @@ -203,7 +205,8 @@ describe(QUICStream.name, () => { .noShrink(), ], async (tlsConfigProm, streamsData) => { - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -296,7 +299,8 @@ describe(QUICStream.name, () => { if (reason === testReason) return 2; return 1; }; - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -399,7 +403,8 @@ describe(QUICStream.name, () => { if (reason === testReason) return 2; return 1; }; - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -438,7 +443,7 @@ describe(QUICStream.name, () => { const conn = (await connectionEventProm.p).detail; // Do the test const activeServerStreams: Array> = []; - const serverStreamsProm = promise(); + const serverStreamsProm = utils.promise(); let serverStreamNum = 0; conn.addEventListener( 'stream', @@ -453,7 +458,7 @@ describe(QUICStream.name, () => { // Let's make a new streams. const activeClientStreams: Array> = []; const message = Buffer.from('Hello!'); - const serverStreamsDoneProm = promise(); + const serverStreamsDoneProm = utils.promise(); for (let i = 0; i < streamsNum; i++) { activeClientStreams.push( (async () => { @@ -464,7 +469,7 @@ describe(QUICStream.name, () => { await stream.readable.cancel(testReason); await serverStreamsDoneProm.p; // Need time for packets to send/recv - await sleep(100); + await testsUtils.sleep(100); const writeProm = writer.write(message); await writeProm.then( () => { @@ -501,7 +506,8 @@ describe(QUICStream.name, () => { fc.uint8Array({ minLength: 1 }).noShrink(), ], async (tlsConfigProm, streamsNum, message) => { - const connectionEventProm = promise(); + const connectionEventProm = + utils.promise(); const tlsConfig = await tlsConfigProm; let server: QUICServer | null = null; let client: QUICClient | null = null; @@ -536,8 +542,8 @@ describe(QUICStream.name, () => { // Do the test let streamCreatedCount = 0; let streamEndedCount = 0; - const streamCreationProm = promise(); - const streamEndedProm = promise(); + const streamCreationProm = utils.promise(); + const streamEndedProm = utils.promise(); conn.addEventListener( 'stream', (asd: events.QUICConnectionStreamEvent) => { @@ -563,7 +569,7 @@ describe(QUICStream.name, () => { } await Promise.race([ streamCreationProm.p, - sleep(100).then(() => { + testsUtils.sleep(100).then(() => { throw Error('Creation timed out'); }), ]); @@ -571,7 +577,7 @@ describe(QUICStream.name, () => { await client.destroy({ force: true }); await Promise.race([ streamEndedProm.p, - sleep(100).then(() => { + testsUtils.sleep(100).then(() => { throw Error('Ending timed out'); }), ]); @@ -584,4 +590,176 @@ describe(QUICStream.name, () => { }, { numRuns: 10 }, ); + testProp( + 'streams should contain metadata', + [tlsConfigWithCaGENOKPArb, tlsConfigWithCaGENOKPArb], + async (tlsConfigProm1, tlsConfigProm2) => { + const connectionEventProm = + utils.promise(); + const tlsConfig1 = await tlsConfigProm1; + const tlsConfig2 = await tlsConfigProm2; + let server: QUICServer | null = null; + let client: QUICClient | null = null; + try { + server = new QUICServer({ + crypto, + logger: logger.getChild(QUICServer.name), + config: { + tlsConfig: tlsConfig1.tlsConfig, + verifyPeer: true, + verifyPem: tlsConfig2.ca.certChainPem, + }, + }); + server.addEventListener( + 'connection', + (e: events.QUICServerConnectionEvent) => + connectionEventProm.resolveP(e), + ); + await server.start({ + host: '127.0.0.1' as Host, + }); + client = await QUICClient.createQUICClient({ + host: '127.0.0.1' as Host, + port: server.port, + localHost: '127.0.0.1' as Host, + crypto, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + tlsConfig: tlsConfig2.tlsConfig, + }, + }); + const conn = (await connectionEventProm.p).detail; + // Do the test + const serverStreamProm = utils.promise(); + conn.addEventListener( + 'stream', + (event: events.QUICConnectionStreamEvent) => { + serverStreamProm.resolveP(event.detail); + }, + ); + // Lets make a new streams. + const message = Buffer.from('Hello!'); + const clientStream = await client.connection.streamNew(); + const writer = clientStream.writable.getWriter(); + await writer.write(message); + writer.releaseLock(); + await Promise.race([ + serverStreamProm.p, + testsUtils.sleep(500).then(() => { + throw Error('Creation timed out'); + }), + ]); + const clientMetadata = clientStream.remoteInfo; + expect(clientMetadata.localHost).toBe(client.host); + expect(clientMetadata.localPort).toBe(client.port); + expect(clientMetadata.remoteHost).toBe(server.host); + expect(clientMetadata.remotePort).toBe(server.port); + expect(clientMetadata.remoteCertificates?.length).toBeGreaterThan(0); + const clientPemChain = utils.certificatePEMsToCertChainPem( + clientMetadata.remoteCertificates!, + ); + expect(clientPemChain).toEqual(tlsConfig1.tlsConfig.certChainPem); + + const serverStream = await serverStreamProm.p; + const serverMetadata = serverStream.remoteInfo; + expect(serverMetadata.localHost).toBe(server.host); + expect(serverMetadata.localPort).toBe(server.port); + expect(serverMetadata.remoteHost).toBe(client.host); + expect(serverMetadata.remotePort).toBe(client.port); + expect(serverMetadata.remoteCertificates?.length).toBeGreaterThan(0); + const serverPemChain = utils.certificatePEMsToCertChainPem( + serverMetadata.remoteCertificates!, + ); + expect(serverPemChain).toEqual(tlsConfig2.tlsConfig.certChainPem); + } finally { + await client?.destroy({ force: true }); + await server?.stop({ force: true }); + } + }, + { numRuns: 1 }, + ); + testProp( + 'streams can be cancelled', + [tlsConfigWithCaGENOKPArb, tlsConfigWithCaGENOKPArb], + async (tlsConfigProm1, tlsConfigProm2) => { + const cancelReason = Symbol('CancelReason'); + const connectionEventProm = + utils.promise(); + const tlsConfig1 = await tlsConfigProm1; + const tlsConfig2 = await tlsConfigProm2; + let server: QUICServer | null = null; + let client: QUICClient | null = null; + try { + server = new QUICServer({ + crypto, + logger: logger.getChild(QUICServer.name), + config: { + tlsConfig: tlsConfig1.tlsConfig, + verifyPeer: true, + verifyPem: tlsConfig2.ca.certChainPem, + }, + }); + server.addEventListener( + 'connection', + (e: events.QUICServerConnectionEvent) => + connectionEventProm.resolveP(e), + ); + await server.start({ + host: '127.0.0.1' as Host, + }); + client = await QUICClient.createQUICClient({ + host: '127.0.0.1' as Host, + port: server.port, + localHost: '127.0.0.1' as Host, + crypto, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + tlsConfig: tlsConfig2.tlsConfig, + }, + }); + const conn = (await connectionEventProm.p).detail; + // Do the test + const serverStreamProm = utils.promise(); + conn.addEventListener( + 'stream', + (event: events.QUICConnectionStreamEvent) => { + serverStreamProm.resolveP(event.detail); + }, + ); + // Lets make a new streams. + const message = Buffer.from('Hello!'); + const clientStream = await client.connection.streamNew(); + const writer = clientStream.writable.getWriter(); + await writer.write(message); + writer.releaseLock(); + await Promise.race([ + serverStreamProm.p, + testsUtils.sleep(500).then(() => { + throw Error('Creation timed out'); + }), + ]); + clientStream.cancel(cancelReason); + await expect(clientStream.readable.getReader().read()).rejects.toBe( + cancelReason, + ); + await expect(clientStream.writable.getWriter().write()).rejects.toBe( + cancelReason, + ); + // Let's check that the server side ended + const serverStream = await serverStreamProm.p; + await expect( + serverStream.readable.pipeTo(serverStream.writable), + ).rejects.toThrow(); + // And client stream should've cleaned up + await testsUtils.sleep(100); + expect(clientStream[destroyed]).toBeTrue(); + } finally { + await client?.destroy({ force: true }); + await server?.stop({ force: true }); + } + }, + { numRuns: 5 }, + ); }); diff --git a/tests/tlsUtils.ts b/tests/tlsUtils.ts index 17a06069..53d57280 100644 --- a/tests/tlsUtils.ts +++ b/tests/tlsUtils.ts @@ -330,37 +330,45 @@ const tlsConfigArb = (keyPairs: fc.Arbitrary> = keyPairsArb()) => .map(async (keyPairs) => await createTLSConfigWithChain(keyPairs)) .noShrink(); +const tlsConfigWithCaRSAArb = fc.record({ + type: fc.constant('RSA'), + ca: fc.constant(certFixtures.tlsConfigMemRSACa), + tlsConfig: certFixtures.tlsConfigRSAExampleArb, +}); + +const tlsConfigWithCaOKPArb = fc.record({ + type: fc.constant('OKP'), + ca: fc.constant(certFixtures.tlsConfigMemOKPCa), + tlsConfig: certFixtures.tlsConfigOKPExampleArb, +}); + +const tlsConfigWithCaECDSAArb = fc.record({ + type: fc.constant('ECDSA'), + ca: fc.constant(certFixtures.tlsConfigMemECDSACa), + tlsConfig: certFixtures.tlsConfigECDSAExampleArb, +}); + +const tlsConfigWithCaGENOKPArb = tlsConfigArb().map(async (configProm) => { + const config = await configProm; + return { + type: fc.constant('GEN-OKP'), + tlsConfig: { + certChainPem: config.certChainPem, + privKeyPem: config.privKeyPem, + }, + ca: { + certChainPem: config.caPem, + privKeyPem: '', + }, + }; +}); + const tlsConfigWithCaArb = fc .oneof( - fc.record({ - type: fc.constant('RSA'), - ca: fc.constant(certFixtures.tlsConfigMemRSACa), - tlsConfig: certFixtures.tlsConfigRSAExampleArb, - }), - fc.record({ - type: fc.constant('OKP'), - ca: fc.constant(certFixtures.tlsConfigMemOKPCa), - tlsConfig: certFixtures.tlsConfigOKPExampleArb, - }), - fc.record({ - type: fc.constant('ECDSA'), - ca: fc.constant(certFixtures.tlsConfigMemECDSACa), - tlsConfig: certFixtures.tlsConfigECDSAExampleArb, - }), - tlsConfigArb().map(async (configProm) => { - const config = await configProm; - return { - type: fc.constant('GEN-OKP'), - tlsConfig: { - certChainPem: config.certChainPem, - privKeyPem: config.privKeyPem, - }, - ca: { - certChainPem: config.caPem, - privKeyPem: '', - }, - }; - }), + tlsConfigWithCaRSAArb, + tlsConfigWithCaOKPArb, + tlsConfigWithCaECDSAArb, + tlsConfigWithCaGENOKPArb, ) .noShrink(); @@ -372,4 +380,8 @@ export { keyPairsArb, tlsConfigArb, tlsConfigWithCaArb, + tlsConfigWithCaRSAArb, + tlsConfigWithCaOKPArb, + tlsConfigWithCaECDSAArb, + tlsConfigWithCaGENOKPArb, };