Skip to content

Commit

Permalink
quic: add pending state for streams
Browse files Browse the repository at this point in the history
This resolves the situation in which `openStream()` is called before
the underlying native `QuicSession` handle exists.

Fixes: nodejs#181
PR-URL: nodejs#187
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax authored and juanarbol committed Dec 17, 2019
1 parent 2ac60e0 commit 392f3f2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
19 changes: 19 additions & 0 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,15 @@ stream('initialHeaders', (headers) => {
});
```

### Event: `'ready'`
<!-- YAML
added: REPLACEME
-->

Emitted when the underlying `QuicSession` has emitted its `secure` event
this stream has received its id, which is accessible as `stream.id` once this
event is emitted.

### Event: `'trailingHeaders'`
<!-- YAML
added: REPLACEME
Expand Down Expand Up @@ -1157,6 +1166,16 @@ added: REPLACEME

The numeric identifier of the `QuicStream`.

### quicstream.pending
<!-- YAML
added: REPLACEME
-->

* {boolean}

This property is `true` if the underlying session is not finished yet,
i.e. before the `'ready'` event is emitted.

### quicstream.serverInitiated
<!-- YAML
added: REPLACEME
Expand Down
77 changes: 56 additions & 21 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ function onStreamReady(streamHandle, id) {

// TODO(@jasnell): Get default options from session
const uni = id & 0b10;
const stream = new QuicStream({ writable: !uni }, session, id, streamHandle);
const stream = new QuicStream({ writable: !uni }, session);
stream[kSetHandle](streamHandle);
if (uni)
stream.end();
session[kAddStream](id, stream);
Expand Down Expand Up @@ -1668,30 +1669,37 @@ class QuicSession extends EventEmitter {
if (halfOpen !== undefined && typeof halfOpen !== 'boolean')
throw new ERR_INVALID_ARG_TYPE('options.halfOpen', 'boolean', halfOpen);

const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);
if (handle === undefined)
throw new ERR_QUICSTREAM_OPEN_FAILED();

const id = handle.id();
const stream = new QuicStream(
{
highWaterMark,
readable: !halfOpen
},
this,
id,
handle);
this);
if (halfOpen) {
stream.push(null);
stream.read();
}
this[kAddStream](id, stream);

if (!this.#handshakeComplete)
this.on('secure', QuicSession.#makeStream.bind(this, stream, halfOpen));
else
QuicSession.#makeStream.call(this, stream, halfOpen);

return stream;
}

static #makeStream = function(stream, halfOpen) {
const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);
if (handle === undefined)
this.emit('error', new ERR_QUICSTREAM_OPEN_FAILED());

stream[kSetHandle](handle);
this[kAddStream](stream.id, stream);
}

get duration() {
const now = process.hrtime.bigint();
const stats = this.#stats || this[kHandle].stats;
Expand Down Expand Up @@ -2056,18 +2064,13 @@ class QuicStream extends Duplex {
#dataSizeHistogram = undefined;
#dataAckHistogram = undefined;

constructor(options, session, id, handle) {
constructor(options, session) {
super({
...options,
allowHalfOpen: true,
decodeStrings: true,
emitClose: true
});
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this[kSetHandle](handle);
this.#id = id;
this.#session = session;
this._readableState.readingMore = true;
this.on('pause', streamOnPause);
Expand Down Expand Up @@ -2097,9 +2100,16 @@ class QuicStream extends Duplex {
[kSetHandle](handle) {
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this.#id = handle.id();

this.#dataRateHistogram = new Histogram(handle.data_rx_rate);
this.#dataSizeHistogram = new Histogram(handle.data_rx_size);
this.#dataAckHistogram = new Histogram(handle.data_rx_ack);

this.emit('ready');
} else {
if (this.#dataRateHistogram)
this.#dataRateHistogram[kDestroyHistogram]();
Expand Down Expand Up @@ -2148,6 +2158,9 @@ class QuicStream extends Duplex {
if (this.destroyed || this.#closed)
return;

if (this.pending)
return this.on('ready', () => this[kClose](family, code));

this.#closed = true;

this.#aborted = this.readable || this.writable;
Expand Down Expand Up @@ -2175,6 +2188,10 @@ class QuicStream extends Duplex {
process.nextTick(emit.bind(this, 'abort', code, family));
}

get pending() {
return this.#id === undefined;
}

get aborted() {
return this.#aborted;
}
Expand Down Expand Up @@ -2226,7 +2243,13 @@ class QuicStream extends Duplex {

[kWriteGeneric](writev, data, encoding, cb) {
if (this.destroyed)
return;
return; // TODO(addaleax): Can this happen?

if (this.pending) {
return this.on('ready', () => {
this[kWriteGeneric](writev, data, encoding, cb);
});
}

this[kUpdateTimer]();
const req = (writev) ?
Expand All @@ -2251,6 +2274,9 @@ class QuicStream extends Duplex {
// coming so that a fin stream packet can be
// sent.
_final(cb) {
if (this.pending)
return this.on('ready', () => this._final(cb));

const handle = this[kHandle];
if (handle === undefined) {
cb();
Expand All @@ -2267,7 +2293,10 @@ class QuicStream extends Duplex {
}

_read(nread) {
if (this.destroyed) {
if (this.pending)
return this.on('ready', () => this._read(nread));

if (this.destroyed) { // TODO(addaleax): Can this happen?
this.push(null);
return;
}
Expand Down Expand Up @@ -2317,6 +2346,12 @@ class QuicStream extends Duplex {
else if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);

if (this.pending) {
return this.on('ready', () => {
this.sendFD(fd, { offset, length }, ownsFd);
});
}

this[kUpdateTimer]();
this.ownsFd = ownsFd;

Expand Down

0 comments on commit 392f3f2

Please sign in to comment.