diff --git a/src/QUICConnection.ts b/src/QUICConnection.ts index 74c11041..99946ff0 100644 --- a/src/QUICConnection.ts +++ b/src/QUICConnection.ts @@ -632,6 +632,7 @@ class QUICConnection extends EventTarget { logger: this.logger.getChild(`${QUICStream.name} ${streamId!}-${Math.floor(Math.random() * 100)}`), }); const writer = quicStream.writable.getWriter(); + const reader = quicStream.readable.getReader(); try { // This will now wait until the 0-length buffer is actually sent @@ -639,10 +640,15 @@ class QUICConnection extends EventTarget { // But its a stream, messages can be coalesced, so ignore first byte? IDK... // We also want to do a ping/pong message to see if stream creation is rejected. await writer.write(new Uint8Array(1)); + // Ignore the first response message. + // await reader.read(); writer.releaseLock(); + reader.releaseLock(); } catch (e) { + console.error(e); // You must release the lock even before you run destroy writer.releaseLock(); + reader.releaseLock(); // If the write failed, it will only close the sending side // But in this case, it means we actually failed to open the stream entirely // In which case we destroy the stream diff --git a/src/QUICStream.ts b/src/QUICStream.ts index a69327d7..2befed7b 100644 --- a/src/QUICStream.ts +++ b/src/QUICStream.ts @@ -53,6 +53,8 @@ class QUICStream // This resolves when `streamRecv` results in a `StreamReset(u64)` or a fin flag indicating receiving has ended protected recvFinishedProm = utils.promise(); protected destroyingMap: Map; + protected firstRecv = true; + protected firstSend = true; /** * For `reasonToCode`, return 0 means "unknown reason" @@ -132,7 +134,10 @@ class QUICStream cancel: async (reason) => { await this.closeRecv(true, reason); }, - }); + }, + { + highWaterMark: 2, + }); this.writable = new WritableStream({ start: (controller) => { @@ -147,6 +152,7 @@ class QUICStream // with the `fin` set to true // If this itself results in an error, we can continue // But continue to do the below + this.logger.info('sending fin frame'); await this.streamSend(new Uint8Array(0), true).catch((e) => { // Ignore send error if stream is already closed if (e.message !== 'send') throw e; @@ -241,7 +247,8 @@ class QUICStream @ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying']) public read(): void { // After reading it's possible the writer had a state change. - void this.isFinished(); + // void this.isFinished(); + // void this.isRecvFinished(); if (this._recvPaused) { // Do nothing if we are paused this.logger.info('Skipping read, paused'); @@ -256,8 +263,8 @@ class QUICStream */ @ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying']) public write(): void { - // Checking if streams have ended - void this.isFinished(); + // Checking if writable has ended + void this.isSendFinished(); if (this.resolveWritableP != null) { this.resolveWritableP(); } @@ -318,10 +325,12 @@ class QUICStream protected async streamRecv(): Promise { const buf = Buffer.alloc(1024); let recvLength: number, fin: boolean; + console.trace('asd') this.logger.info('trying receiving'); try { [recvLength, fin] = this.conn.streamRecv(this.streamId, buf); } catch (e) { + this.logger.error(e.message) if (e.message === 'Done') { // When it is reported to be `Done`, it just means that there is no data to read // it does not mean that the stream is closed or finished @@ -355,10 +364,14 @@ class QUICStream // Let's check if sending side has finished await this.connection.send(); } - // It's possible to get a 0-length buffer - // In fact 0-length buffers are used to "open" a stream - if (recvLength > 0) { - if (!this._recvClosed) { + if (!this._recvClosed) { + // First byte is used to open stream, we need to ignore this + if (this.firstRecv) { + this.firstRecv = false; + console.log('FIRST', recvLength); + // If message is only one byte then we skip + if (recvLength > 1) this.readableController.enqueue(buf.subarray(1, recvLength)); + } else { this.readableController.enqueue(buf.subarray(0, recvLength)); } } @@ -373,6 +386,7 @@ class QUICStream return; } // Now we pause receiving if the queue is full + console.log('asd', this.readableController.desiredSize); if ( this.readableController.desiredSize != null && this.readableController.desiredSize <= 0 @@ -509,8 +523,9 @@ class QUICStream type: 'recv' | 'send', ): Promise { const match = - e.message.match(/StreamStopped\((.+)\)/) ?? - e.message.match(/InvalidStreamState\((.+)\)/); + e.message.match(/StreamStopped\((.+)\)/) + ?? e.message.match(/InvalidStreamState\((.+)\)/) + ?? e.message.match(/StreamReset\((.+)\)/); if (match != null) { const code = parseInt(match[1]); return await this.codeToReason(type, code); diff --git a/tests/QUICStream.test.ts b/tests/QUICStream.test.ts index 8c663728..329e0e00 100644 --- a/tests/QUICStream.test.ts +++ b/tests/QUICStream.test.ts @@ -11,7 +11,7 @@ import * as testsUtils from './utils'; import { sleep } from './utils'; describe(QUICStream.name, () => { - const logger = new Logger(`${QUICStream.name} Test`, LogLevel.WARN, [ + const logger = new Logger(`${QUICStream.name} Test`, LogLevel.INFO, [ new StreamHandler( formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`, ), @@ -288,7 +288,105 @@ describe(QUICStream.name, () => { }, { numRuns: 10 }, ); - test.todo('should send data over stream'); + testProp.only( + 'should send data over stream', + [ + tlsConfigWithCaArb, + fc.array( + fc.array(fc.uint8Array( { minLength: 1})), + { + minLength: 1, + maxLength: 1, + }, + ).noShrink()], + async (tlsConfigProm, streamsData) => { + const connectionEventProm = promise(); + const tlsConfig = await tlsConfigProm; + let server: QUICServer | null = null; + let client: QUICClient | null = null; + try { + server = new QUICServer({ + crypto, + logger: logger.getChild(QUICServer.name), + config: { + tlsConfig: tlsConfig.tlsConfig, + verifyPeer: false, + logKeys: './tmp/key.log', + }, + }); + server.addEventListener( + 'connection', + (e: events.QUICServerConnectionEvent) => + connectionEventProm.resolveP(e), + ); + await server.start({ + host: '127.0.0.1' as Host, + port: 55555 as Port, + }); + client = await QUICClient.createQUICClient({ + host: '::ffff:127.0.0.1' as Host, + port: server.port, + localHost: '::' as Host, + crypto, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + }, + }); + const conn = (await connectionEventProm.p).detail; + // Do the test + const activeServerStreams: Array> = []; + conn.addEventListener('stream', (streamEvent: events.QUICConnectionStreamEvent) => { + const stream = streamEvent.detail; + const streamProm = stream.readable.pipeTo(stream.writable) + activeServerStreams.push(streamProm); + }); + + // Let's make a new streams. + const activeClientStreams: Array> = []; + for (const data of streamsData) { + activeClientStreams.push((async () => { + const stream = await client.connection.streamNew(); + console.log('a'); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + console.log('a'); + // do write and read messages here. + for (const message of data) { + console.log(message) + await writer.write(message); + console.log('a'); + const readMessage = await reader.read(); + expect(readMessage.done).toBeFalse(); + expect(readMessage.value).toStrictEqual(message); + } + console.log('B'); + await writer.close(); + const value = await reader.read(); + expect(value.done).toBeTrue(); + console.log('B'); + })()); + } + console.log('waiting'); + await Promise.all([ + Promise.all(activeClientStreams), + // Promise.all(activeServerStreams), + ]) + console.log('test done'); + } catch(e) { + console.log('ERROR'); + console.error(e); + throw e; + } finally { + console.log('finally'); + await client?.destroy({ force: true }); + console.log('a'); + await server?.stop({ force: true }); + console.log('b'); + } + }, + { numRuns: 1 }, + ); test.todo('should propagate errors over stream'); test.todo('should clean up streams when connection ends'); test.todo('should handle rejected stream creation');