Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: add pending state for streams
Browse files Browse the repository at this point in the history
This resolves the situation in which `openStream()` is called before
the underlying native `QuicSession` handle exists.

Fixes: #181
PR-URL: #187
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax committed Dec 11, 2019
1 parent 1236345 commit 8640fb8
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 21 deletions.
19 changes: 19 additions & 0 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,15 @@ stream('initialHeaders', (headers) => {
});
```

### Event: `'ready'`
<!-- YAML
added: REPLACEME
-->

Emitted when the underlying `QuicSession` has emitted its `secure` event
this stream has received its id, which is accessible as `stream.id` once this
event is emitted.

### Event: `'trailingHeaders'`
<!-- YAML
added: REPLACEME
Expand Down Expand Up @@ -1157,6 +1166,16 @@ added: REPLACEME

The numeric identifier of the `QuicStream`.

### quicstream.pending
<!-- YAML
added: REPLACEME
-->

* {boolean}

This property is `true` if the underlying session is not finished yet,
i.e. before the `'ready'` event is emitted.

### quicstream.serverInitiated
<!-- YAML
added: REPLACEME
Expand Down
77 changes: 56 additions & 21 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ function onStreamReady(streamHandle, id) {

// TODO(@jasnell): Get default options from session
const uni = id & 0b10;
const stream = new QuicStream({ writable: !uni }, session, id, streamHandle);
const stream = new QuicStream({ writable: !uni }, session);
stream[kSetHandle](streamHandle);
if (uni)
stream.end();
session[kAddStream](id, stream);
Expand Down Expand Up @@ -1668,30 +1669,37 @@ class QuicSession extends EventEmitter {
if (halfOpen !== undefined && typeof halfOpen !== 'boolean')
throw new ERR_INVALID_ARG_TYPE('options.halfOpen', 'boolean', halfOpen);

const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);
if (handle === undefined)
throw new ERR_QUICSTREAM_OPEN_FAILED();

const id = handle.id();
const stream = new QuicStream(
{
highWaterMark,
readable: !halfOpen
},
this,
id,
handle);
this);
if (halfOpen) {
stream.push(null);
stream.read();
}
this[kAddStream](id, stream);

if (!this.#handshakeComplete)
this.on('secure', QuicSession.#makeStream.bind(this, stream, halfOpen));
else
QuicSession.#makeStream.call(this, stream, halfOpen);

return stream;
}

static #makeStream = function(stream, halfOpen) {
const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);
if (handle === undefined)
this.emit('error', new ERR_QUICSTREAM_OPEN_FAILED());

stream[kSetHandle](handle);
this[kAddStream](stream.id, stream);
}

get duration() {
const now = process.hrtime.bigint();
const stats = this.#stats || this[kHandle].stats;
Expand Down Expand Up @@ -2056,18 +2064,13 @@ class QuicStream extends Duplex {
#dataSizeHistogram = undefined;
#dataAckHistogram = undefined;

constructor(options, session, id, handle) {
constructor(options, session) {
super({
...options,
allowHalfOpen: true,
decodeStrings: true,
emitClose: true
});
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this[kSetHandle](handle);
this.#id = id;
this.#session = session;
this._readableState.readingMore = true;
this.on('pause', streamOnPause);
Expand Down Expand Up @@ -2097,9 +2100,16 @@ class QuicStream extends Duplex {
[kSetHandle](handle) {
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this.#id = handle.id();

this.#dataRateHistogram = new Histogram(handle.data_rx_rate);
this.#dataSizeHistogram = new Histogram(handle.data_rx_size);
this.#dataAckHistogram = new Histogram(handle.data_rx_ack);

this.emit('ready');
} else {
if (this.#dataRateHistogram)
this.#dataRateHistogram[kDestroyHistogram]();
Expand Down Expand Up @@ -2148,6 +2158,9 @@ class QuicStream extends Duplex {
if (this.destroyed || this.#closed)
return;

if (this.pending)
return this.on('ready', () => this[kClose](family, code));

this.#closed = true;

this.#aborted = this.readable || this.writable;
Expand Down Expand Up @@ -2175,6 +2188,10 @@ class QuicStream extends Duplex {
process.nextTick(emit.bind(this, 'abort', code, family));
}

get pending() {
return this.#id === undefined;
}

get aborted() {
return this.#aborted;
}
Expand Down Expand Up @@ -2226,7 +2243,13 @@ class QuicStream extends Duplex {

[kWriteGeneric](writev, data, encoding, cb) {
if (this.destroyed)
return;
return; // TODO(addaleax): Can this happen?

if (this.pending) {
return this.on('ready', () => {
this[kWriteGeneric](writev, data, encoding, cb);
});
}

this[kUpdateTimer]();
const req = (writev) ?
Expand All @@ -2251,6 +2274,9 @@ class QuicStream extends Duplex {
// coming so that a fin stream packet can be
// sent.
_final(cb) {
if (this.pending)
return this.on('ready', () => this._final(cb));

const handle = this[kHandle];
if (handle === undefined) {
cb();
Expand All @@ -2267,7 +2293,10 @@ class QuicStream extends Duplex {
}

_read(nread) {
if (this.destroyed) {
if (this.pending)
return this.on('ready', () => this._read(nread));

if (this.destroyed) { // TODO(addaleax): Can this happen?
this.push(null);
return;
}
Expand Down Expand Up @@ -2317,6 +2346,12 @@ class QuicStream extends Duplex {
else if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);

if (this.pending) {
return this.on('ready', () => {
this.sendFD(fd, { offset, length }, ownsFd);
});
}

this[kUpdateTimer]();
this.ownsFd = ownsFd;

Expand Down
70 changes: 70 additions & 0 deletions test/parallel/test-quic-openstream-pending.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict';
const common = require('../common');
if (!common.hasQuic)
common.skip('missing quic');

// Test that opening a stream works even if the session isn’t ready yet.

const assert = require('assert');
const quic = require('quic');

const fixtures = require('../common/fixtures');
const key = fixtures.readKey('agent1-key.pem', 'binary');
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
const ca = fixtures.readKey('ca1-cert.pem', 'binary');

const server = quic.createSocket({ port: 0, validateAddress: true });

server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow',
maxStreamsUni: 100
});

server.on('session', common.mustCall((session) => {
session.on('stream', common.mustCall((stream) => {
let data = '';
stream.setEncoding('utf8');
stream.on('data', (chunk) => data += chunk);
stream.on('end', common.mustCall(() => {
assert.strictEqual(data, 'Hello!');
session.close();
server.close();
}));
}));

session.on('close', common.mustCall());
}));

server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});

const stream = req.openStream({ halfOpen: true });
stream.end('Hello!');

assert.strictEqual(stream.pending, true);
stream.on('ready', common.mustCall(() => {
assert.strictEqual(stream.pending, false);
}));

req.on('close', common.mustCall(() => client.close()));
}));

server.on('close', common.mustCall());

0 comments on commit 8640fb8

Please sign in to comment.