Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: support AbortSignal in QuicSocket connect/listen #34908

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions deps/nghttp3/lib/nghttp3_ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,25 @@

#include "nghttp3_macro.h"

#if defined(_MSC_VER) && defined(_M_ARM64)
unsigned int __popcnt(unsigned int x) {
#if defined(_WIN32)
# if defined(_M_ARM64)
unsigned int __nghttp3_popcnt(unsigned int x) {
unsigned int c = 0;
for (; x; ++c) {
x &= x - 1;
}
return c;
}
# else
# define __nghttp3_popcnt __popcnt
# endif
#endif

int nghttp3_ringbuf_init(nghttp3_ringbuf *rb, size_t nmemb, size_t size,
const nghttp3_mem *mem) {
if (nmemb) {
#ifdef WIN32
assert(1 == __popcnt((unsigned int)nmemb));
assert(1 == __nghttp3_popcnt((unsigned int)nmemb));
#else
assert(1 == __builtin_popcount((unsigned int)nmemb));
#endif
Expand Down Expand Up @@ -127,7 +131,7 @@ int nghttp3_ringbuf_reserve(nghttp3_ringbuf *rb, size_t nmemb) {
}

#ifdef WIN32
assert(1 == __popcnt((unsigned int)nmemb));
assert(1 == __nghttp3_popcnt((unsigned int)nmemb));
#else
assert(1 == __builtin_popcount((unsigned int)nmemb));
#endif
Expand Down
10 changes: 7 additions & 3 deletions deps/ngtcp2/lib/ngtcp2_ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@

#include "ngtcp2_macro.h"

#if defined(_MSC_VER) && defined(_M_ARM64)
unsigned int __popcnt(unsigned int x) {
#if defined(_WIN32)
# if defined(_M_ARM64)
unsigned int __ngtcp2_popcnt(unsigned int x) {
unsigned int c = 0;
for (; x; ++c) {
x &= x - 1;
}
return c;
}
# else
# define __ngtcp2_popcnt __popcnt
# endif
#endif

int ngtcp2_ringbuf_init(ngtcp2_ringbuf *rb, size_t nmemb, size_t size,
const ngtcp2_mem *mem) {
#ifdef WIN32
assert(1 == __popcnt((unsigned int)nmemb));
assert(1 == __ngtcp2_popcnt((unsigned int)nmemb));
#else
assert(1 == __builtin_popcount((unsigned int)nmemb));
#endif
Expand Down
4 changes: 4 additions & 0 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,8 @@ added: REPLACEME
* `maxStreamDataUni` {number}
* `maxStreamsBidi` {number}
* `maxStreamsUni` {number}
* `signal` {AbortSignal} Optionally allows the `connect()` to be canceled
using an `AbortController`.
* `h3` {Object} HTTP/3 Specific Configuration Options
* `qpackMaxTableCapacity` {number}
* `qpackBlockedStreams` {number}
Expand Down Expand Up @@ -1830,6 +1832,8 @@ added: REPLACEME
[OpenSSL Options][].
* `sessionIdContext` {string} Opaque identifier used by servers to ensure
session state is not shared between applications. Unused by clients.
* `signal` {AbortSignal} Optionally allows the `listen()` to be canceled
using an `AbortController`.
* Returns: {Promise}

Listen for new peer-initiated sessions. Returns a `Promise` that is resolved
Expand Down
44 changes: 41 additions & 3 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class QuicEndpoint {
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind]().finally(() => {
return state.bindPromise = this[kBind](options).finally(() => {
state.bindPromise = undefined;
});
}
Expand Down Expand Up @@ -1187,6 +1187,15 @@ class QuicSocket extends EventEmitter {
...options,
};

const { signal } = options;
if (signal != null && !('aborted' in signal))
throw new ERR_INVALID_ARG_TYPE('options.signal', 'AbortSignal', signal);

// If an AbortSignal was passed in, check to make sure it is not already
// aborted before we continue on to do any work.
if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

// The ALPN protocol identifier is strictly required.
const {
alpn,
Expand All @@ -1211,7 +1220,10 @@ class QuicSocket extends EventEmitter {
state.ocspHandler = ocspHandler;
state.clientHelloHandler = clientHelloHandler;

await this[kMaybeBind]();
await this[kMaybeBind]({ signal });

if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

// It's possible that the QuicSocket was destroyed or closed while
// the bind was pending. Check for that and handle accordingly.
Expand All @@ -1226,6 +1238,9 @@ class QuicSocket extends EventEmitter {
type
} = await resolvePreferredAddress(lookup, transportParams.preferredAddress);

if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

// It's possible that the QuicSocket was destroyed or closed while
// the preferred address resolution was pending. Check for that and handle
// accordingly.
Expand Down Expand Up @@ -1264,6 +1279,14 @@ class QuicSocket extends EventEmitter {
// while the nextTick is pending. If that happens, do nothing.
if (this.destroyed || this.closing)
return;

// The abort signal was triggered while this was pending,
// destroy the QuicSocket with an error.
if (signal && signal.aborted) {
this.destroy(
new lazyDOMException('The operation was aborted', 'AbortError'));
return;
}
try {
this.emit('listening');
} catch (error) {
Expand All @@ -1284,13 +1307,25 @@ class QuicSocket extends EventEmitter {
...options
};

const { signal } = options;
if (signal != null && !('aborted' in signal))
throw new ERR_INVALID_ARG_TYPE('options.signal', 'AbortSignal', signal);

// If an AbortSignal was passed in, check to make sure it is not already
// aborted before we continue on to do any work.
if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

const {
type,
address,
lookup = state.lookup
} = validateQuicSocketConnectOptions(options);

await this[kMaybeBind]();
await this[kMaybeBind]({ signal });

if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

if (this.destroyed)
throw new ERR_INVALID_STATE('QuicSocket was destroyed');
Expand All @@ -1302,6 +1337,9 @@ class QuicSocket extends EventEmitter {
} = await lookup(addressOrLocalhost(address, type),
type === AF_INET6 ? 6 : 4);

if (signal && signal.aborted)
throw new lazyDOMException('The operation was aborted', 'AbortError');

if (this.destroyed)
throw new ERR_INVALID_STATE('QuicSocket was destroyed');
if (this.closing)
Expand Down
108 changes: 54 additions & 54 deletions src/quic/node_quic_http3_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,47 +122,55 @@ Http3Application::Http3Application(
int64_t Http3Application::CreateAndBindPushStream(int64_t push_id) {
CHECK(session()->is_server());
int64_t stream_id;
if (!session()->OpenUnidirectionalStream(&stream_id))
return 0;
return nghttp3_conn_bind_push_stream(
connection(),
push_id,
stream_id) == 0 ? stream_id : 0;
return session()->OpenUnidirectionalStream(&stream_id) &&
nghttp3_conn_bind_push_stream(connection(), push_id, stream_id) == 0
? stream_id : 0;
}

bool Http3Application::SubmitPushPromise(
Http3Application::PushInfo Http3Application::SubmitPushPromise(
int64_t id,
int64_t* push_id,
int64_t* stream_id,
const Http3Headers& headers) {
// Successfully creating the push promise and opening the
// fulfillment stream will queue nghttp3 up to send data.
// Creating the SendSessionScope here ensures that when
// SubmitPush exits, SendPendingData will be called if
// we are not within the context of an ngtcp2 callback.
QuicSession::SendSessionScope send_scope(session());
PushInfo info{};

Debug(
session(),
"Submitting %d push promise headers",
headers.length());
session(),
"Submitting %d push promise headers",
headers.length());
if (nghttp3_conn_submit_push_promise(
connection(),
push_id,
&info.push_id,
id,
headers.data(),
headers.length()) != 0) {
return false;
headers.length()) == 0) {
// Once we've successfully submitted the push promise and have
// a push id assigned, we create the push fulfillment stream.
info.stream_id = CreateAndBindPushStream(info.push_id);
Debug(
session(),
"Push stream created and bound. Push ID: %" PRId64
". Stream ID: %" PRId64,
info.push_id,
info.stream_id);
}
// Once we've successfully submitting the push promise and have
// a push id assigned, we create the push fulfillment stream.
*stream_id = CreateAndBindPushStream(*push_id);
return *stream_id != 0; // push stream can never use stream id 0
return info;
}

// Information headers are 1xx status blocks that are transmitted
// before the initial response headers. They should only ever be
// transmitted by the server, however, other than checking that
// this QuicSession is a server, we do not perform any additional
// verification.
bool Http3Application::SubmitInformation(
int64_t id,
const Http3Headers& headers) {
if (!session()->is_server())
return false;
QuicSession::SendSessionScope send_scope(session());
Debug(
session(),
Expand All @@ -176,6 +184,8 @@ bool Http3Application::SubmitInformation(
headers.length()) == 0;
}

// Trailers are headers that are transmitted after the HTTP message
// payload and may be sent by either server or client.
bool Http3Application::SubmitTrailers(
int64_t id,
const Http3Headers& headers) {
Expand Down Expand Up @@ -232,24 +242,20 @@ bool Http3Application::SubmitHeaders(
// The headers block passed to the submit push contains the assumed
// *request* headers. The response headers are provided using the
// SubmitHeaders() function on the created QuicStream.
//
// A push can only be submitted on the server-side.
BaseObjectPtr<QuicStream> Http3Application::SubmitPush(
int64_t id,
Local<Array> headers) {
// If the QuicSession is not a server session, return false
// immediately. Push streams cannot be sent by an HTTP/3 client.
if (!session()->is_server())
return {};

Http3Headers nva(env(), headers);
int64_t push_id;
int64_t stream_id;
Http3Application::PushInfo info{};

// There are several reasons why push may fail. We currently handle
// them all the same. Later we might want to differentiate when the
// return value is NGHTTP3_ERR_PUSH_ID_BLOCKED.
return SubmitPushPromise(id, &push_id, &stream_id, nva) ?
QuicStream::New(session(), stream_id, push_id) :
BaseObjectPtr<QuicStream>();
if (session()->is_server())
info = SubmitPushPromise(id, Http3Headers(env(), headers));

return info.stream_id != 0
? QuicStream::New(session(), info.stream_id, info.push_id)
: BaseObjectPtr<QuicStream>();
}

// Submit informational headers (response headers that use a 1xx
Expand All @@ -259,10 +265,7 @@ BaseObjectPtr<QuicStream> Http3Application::SubmitPush(
bool Http3Application::SubmitInformation(
int64_t stream_id,
Local<Array> headers) {
if (!session()->is_server())
return false;
Http3Headers nva(session()->env(), headers);
return SubmitInformation(stream_id, nva);
return SubmitInformation(stream_id, Http3Headers(session()->env(), headers));
}

// For client sessions, submits request headers. For server sessions,
Expand All @@ -271,16 +274,17 @@ bool Http3Application::SubmitHeaders(
int64_t stream_id,
Local<Array> headers,
uint32_t flags) {
Http3Headers nva(session()->env(), headers);
return SubmitHeaders(stream_id, nva, flags);
return SubmitHeaders(
stream_id,
Http3Headers(session()->env(), headers),
flags);
}

// Submits trailing headers for the HTTP/3 request or response.
bool Http3Application::SubmitTrailers(
int64_t stream_id,
Local<Array> headers) {
Http3Headers nva(session()->env(), headers);
return SubmitTrailers(stream_id, nva);
return SubmitTrailers(stream_id, Http3Headers(session()->env(), headers));
}

void Http3Application::CheckAllocatedSize(size_t previous_size) const {
Expand Down Expand Up @@ -387,13 +391,12 @@ bool Http3Application::Initialize() {
params.initial_max_streams_bidi);
}

if (!CreateAndBindControlStream() ||
!CreateAndBindQPackStreams()) {
return false;
if (CreateAndBindControlStream() && CreateAndBindQPackStreams()) {
set_init_done();
return true;
}

set_init_done();
return true;
return false;
}

// All HTTP/3 control, header, and stream data arrives as QUIC stream data.
Expand Down Expand Up @@ -646,7 +649,7 @@ void Http3Application::BeginHeaders(
// by the QuicStream until stream->EndHeaders() is called, during which
// the collected headers are converted to an array and passed off to
// the javascript side.
bool Http3Application::ReceiveHeader(
void Http3Application::ReceiveHeader(
int64_t stream_id,
int32_t token,
nghttp3_rcbuf* name,
Expand All @@ -671,9 +674,9 @@ bool Http3Application::ReceiveHeader(
name,
value,
flags);
return stream->AddHeader(std::move(header));
// At this level, we don't care if the header is added or not.
USE(stream->AddHeader(std::move(header)));
}
return true;
}

// Marks the completion of a headers block.
Expand Down Expand Up @@ -830,10 +833,8 @@ int Http3Application::OnReceiveHeader(
void* conn_user_data,
void* stream_user_data) {
Http3Application* app = static_cast<Http3Application*>(conn_user_data);
// TODO(@jasnell): Need to determine the appropriate response code here
// for when the header is not going to be accepted.
return app->ReceiveHeader(stream_id, token, name, value, flags) ?
0 : NGHTTP3_ERR_CALLBACK_FAILURE;
app->ReceiveHeader(stream_id, token, name, value, flags);
return 0;
}

int Http3Application::OnEndHeaders(
Expand Down Expand Up @@ -868,8 +869,7 @@ int Http3Application::OnReceivePushPromise(
void* conn_user_data,
void* stream_user_data) {
Http3Application* app = static_cast<Http3Application*>(conn_user_data);
if (!app->ReceiveHeader(stream_id, token, name, value, flags))
return NGHTTP3_ERR_CALLBACK_FAILURE;
app->ReceiveHeader(stream_id, token, name, value, flags);
return 0;
}

Expand Down
Loading