Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: refactor to use more primordials #36211

Merged
merged 1 commit into from
Nov 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 93 additions & 63 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@ assertCrypto();

const {
ArrayFrom,
ArrayPrototypePush,
BigInt64Array,
Boolean,
Error,
FunctionPrototypeBind,
FunctionPrototypeCall,
Map,
Number,
Promise,
PromiseAll,
PromisePrototypeThen,
PromisePrototypeCatch,
PromisePrototypeFinally,
PromiseReject,
PromiseResolve,
Set,
ReflectApply,
SafeSet,
Symbol,
SymbolFor,
} = primordials;
Expand Down Expand Up @@ -302,22 +309,25 @@ function onSessionClose(code, family, silent, statelessReset) {
// being requested. It is only called if the 'clientHelloHandler' option is
// specified on listen().
function onSessionClientHello(alpn, servername, ciphers) {
this[owner_symbol][kClientHello](alpn, servername, ciphers)
.then((context) => {
PromisePrototypeThen(
this[owner_symbol][kClientHello](alpn, servername, ciphers),
(context) => {
if (context !== undefined && !context?.context)
throw new ERR_INVALID_ARG_TYPE('context', 'SecureContext', context);
this.onClientHelloDone(context?.context);
})
.catch((error) => this[owner_symbol].destroy(error));
},
(error) => this[owner_symbol].destroy(error)
);
}

// This callback is only ever invoked for QuicServerSession instances,
// and is used to trigger OCSP request processing when needed. The
// user callback must invoke .onCertDone() in order for the
// TLS handshake to continue.
function onSessionCert(servername) {
this[owner_symbol][kHandleOcsp](servername)
.then((data) => {
PromisePrototypeThen(
this[owner_symbol][kHandleOcsp](servername),
(data) => {
if (data !== undefined) {
if (typeof data === 'string')
data = Buffer.from(data);
Expand All @@ -329,17 +339,20 @@ function onSessionCert(servername) {
}
}
this.onCertDone(data);
})
.catch((error) => this[owner_symbol].destroy(error));
},
(error) => this[owner_symbol].destroy(error)
);
}

// This callback is only ever invoked for QuicClientSession instances,
// and is used to deliver the OCSP response as provided by the server.
// If the requestOCSP configuration option is false, this will never
// be called.
function onSessionStatus(data) {
this[owner_symbol][kHandleOcsp](data)
.catch((error) => this[owner_symbol].destroy(error));
PromisePrototypeCatch(
this[owner_symbol][kHandleOcsp](data),
(error) => this[owner_symbol].destroy(error)
);
}

// Called by the C++ internals when the TLS handshake is completed.
Expand Down Expand Up @@ -369,12 +382,13 @@ function onSessionHandshake(
// resumption and 0RTT.
function onSessionTicket(sessionTicket, transportParams) {
if (this[owner_symbol]) {
process.nextTick(
emit.bind(
this[owner_symbol],
'sessionTicket',
sessionTicket,
transportParams));
process.nextTick(FunctionPrototypeBind(
emit,
this[owner_symbol],
'sessionTicket',
sessionTicket,
transportParams
));
}
}

Expand All @@ -384,13 +398,14 @@ function onSessionTicket(sessionTicket, transportParams) {
function onSessionPathValidation(res, local, remote) {
const session = this[owner_symbol];
if (session) {
process.nextTick(
emit.bind(
session,
'pathValidation',
res === NGTCP2_PATH_VALIDATION_RESULT_FAILURE ? 'failure' : 'success',
local,
remote));
process.nextTick(FunctionPrototypeBind(
emit,
session,
'pathValidation',
res === NGTCP2_PATH_VALIDATION_RESULT_FAILURE ? 'failure' : 'success',
local,
remote
));
}
}

Expand Down Expand Up @@ -486,7 +501,7 @@ function onStreamHeaders(id, headers, kind, push_id) {
// When a stream is flow control blocked, causes a blocked event
// to be emitted. This is a purely informational event.
function onStreamBlocked() {
process.nextTick(emit.bind(this[owner_symbol], 'blocked'));
process.nextTick(FunctionPrototypeBind(emit, this[owner_symbol], 'blocked'));
}

// Register the callbacks with the QUIC internal binding.
Expand Down Expand Up @@ -543,14 +558,17 @@ function addressOrLocalhost(address, type) {
}

function deferredClosePromise(state) {
return state.closePromise = new Promise((resolve, reject) => {
state.closePromiseResolve = resolve;
state.closePromiseReject = reject;
}).finally(() => {
state.closePromise = undefined;
state.closePromiseResolve = undefined;
state.closePromiseReject = undefined;
});
return state.closePromise = PromisePrototypeFinally(
new Promise((resolve, reject) => {
state.closePromiseResolve = resolve;
state.closePromiseReject = reject;
}),
() => {
state.closePromise = undefined;
state.closePromiseResolve = undefined;
state.closePromiseReject = undefined;
}
);
}

async function resolvePreferredAddress(lookup, preferredAddress) {
Expand Down Expand Up @@ -640,7 +658,7 @@ class QuicEndpoint {
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind]().finally(() => {
return state.bindPromise = PromisePrototypeFinally(this[kBind](), () => {
state.bindPromise = undefined;
});
}
Expand Down Expand Up @@ -899,7 +917,7 @@ class QuicSocket extends EventEmitter {
closePromiseResolve: undefined,
closePromiseReject: undefined,
defaultEncoding: undefined,
endpoints: new Set(),
endpoints: new SafeSet(),
highWaterMark: undefined,
listenPending: false,
listenPromise: undefined,
Expand All @@ -908,7 +926,7 @@ class QuicSocket extends EventEmitter {
clientHelloHandler: undefined,
server: undefined,
serverSecureContext: undefined,
sessions: new Set(),
sessions: new SafeSet(),
state: kSocketUnbound,
sharedState: undefined,
stats: undefined,
Expand Down Expand Up @@ -1048,9 +1066,12 @@ class QuicSocket extends EventEmitter {
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind](options).finally(() => {
state.bindPromise = undefined;
});
return state.bindPromise = PromisePrototypeFinally(
this[kBind](options),
() => {
state.bindPromise = undefined;
}
);
}

async [kBind](options) {
Expand All @@ -1074,7 +1095,7 @@ class QuicSocket extends EventEmitter {

const binds = [];
for (const endpoint of state.endpoints)
binds.push(endpoint.bind({ signal }));
ArrayPrototypePush(binds, endpoint.bind({ signal }));

await PromiseAll(binds);

Expand Down Expand Up @@ -1169,9 +1190,12 @@ class QuicSocket extends EventEmitter {
if (state.listenPromise !== undefined)
return state.listenPromise;

return state.listenPromise = this[kListen](options).finally(() => {
state.listenPromise = undefined;
});
return state.listenPromise = PromisePrototypeFinally(
this[kListen](options),
() => {
state.listenPromise = undefined;
}
);
}

async [kListen](options) {
Expand Down Expand Up @@ -1388,8 +1412,9 @@ class QuicSocket extends EventEmitter {
// Otherwise, loop through each of the known sessions and close them.
const reqs = [promise];
for (const session of state.sessions) {
reqs.push(session.close()
.catch((error) => this.destroy(error)));
ArrayPrototypePush(reqs,
PromisePrototypeCatch(session.close(),
(error) => this.destroy(error)));
}
return PromiseAll(reqs);
}
Expand Down Expand Up @@ -1441,11 +1466,11 @@ class QuicSocket extends EventEmitter {
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
process.nextTick(FunctionPrototypeBind(emit, this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function') {
state.closePromiseResolve();
}
process.nextTick(emit.bind(this, 'close'));
process.nextTick(FunctionPrototypeBind(emit, this, 'close'));
}

ref() {
Expand Down Expand Up @@ -1714,14 +1739,17 @@ class QuicSession extends EventEmitter {
if (state.handshakeCompletePromise !== undefined)
return state.handshakeCompletePromise;

state.handshakeCompletePromise = new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}).finally(() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
});
state.handshakeCompletePromise = PromisePrototypeFinally(
new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}),
() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
}
);

return state.handshakeCompletePromise;
}
Expand Down Expand Up @@ -1985,7 +2013,7 @@ class QuicSession extends EventEmitter {
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
process.nextTick(FunctionPrototypeBind(emit, this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

Expand All @@ -1994,7 +2022,7 @@ class QuicSession extends EventEmitter {
new ERR_OPERATION_FAILED('Handshake failed'));
}

process.nextTick(emit.bind(this, 'close'));
process.nextTick(FunctionPrototypeBind(emit, this, 'close'));
}

// For server QuicSession instances, true if earlyData is
Expand Down Expand Up @@ -2698,7 +2726,7 @@ class QuicStream extends Duplex {
default:
assert.fail('Invalid headers kind');
}
process.nextTick(emit.bind(this, name, headers, push_id));
process.nextTick(FunctionPrototypeBind(emit, this, name, headers, push_id));
}

[kAfterAsyncWrite]({ bytes }) {
Expand Down Expand Up @@ -2809,7 +2837,7 @@ class QuicStream extends Duplex {
if (!this.destroyed) {
if (!this.detached)
this[kInternalState].sharedState.writeEnded = true;
super.end.apply(this, args);
ReflectApply(super.end, this, args);
}
return this;
}
Expand All @@ -2825,13 +2853,14 @@ class QuicStream extends Duplex {
state.didRead = true;
}

streamOnResume.call(this);
FunctionPrototypeCall(streamOnResume, this);
}

sendFile(path, options = {}) {
if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file');
fs.open(path, 'r', QuicStream[kOnFileOpened].bind(this, options));
fs.open(path, 'r',
FunctionPrototypeBind(QuicStream[kOnFileOpened], this, options));
}

static [kOnFileOpened](options, err, fd) {
Expand All @@ -2847,7 +2876,7 @@ class QuicStream extends Duplex {
}

if (this.destroyed || this.closed) {
fs.close(fd, (err) => { if (err) throw err; });
fs.close(fd, assert.ifError);
return;
}

Expand Down Expand Up @@ -2895,7 +2924,8 @@ class QuicStream extends Duplex {
static [kOnFileUnpipe]() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch(stream.destroy.bind(stream));
PromisePrototypeCatch(this.source.close(),
FunctionPrototypeBind(stream.destroy, stream));
else
this.source.releaseFD();
stream.end();
Expand Down