From 6a4f1917be3741b43405d11cb4ab7a9834658d16 Mon Sep 17 00:00:00 2001 From: Gabriel Schulhof Date: Sun, 17 Jun 2018 11:52:49 -0400 Subject: [PATCH] n-api: add API for asynchronous functions Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`, and a `v8::Persistent` to make it possible to call into JS from another thread. The API accepts a void data pointer and a callback which will be invoked on the loop thread and which will receive the `napi_value` representing the JavaScript function to call so as to perform the call into JS. The callback is run inside a `node::CallbackScope`. A `std::queue` is used to store calls from the secondary threads, and an idle loop is started by the `uv_async_t` callback on the loop thread to drain the queue, calling into JS with each item. Items can be added to the queue blockingly or non-blockingly. The thread-safe function can be referenced or unreferenced, with the same semantics as libuv handles. Re: https://github.com/nodejs/help/issues/1035 Re: https://github.com/nodejs/node/issues/20964 Fixes: https://github.com/nodejs/node/issues/13512 --- doc/api/errors.md | 30 ++ doc/api/n-api.md | 370 ++++++++++++++- src/node_api.cc | 433 ++++++++++++++++++ src/node_api.h | 38 ++ src/node_api_types.h | 28 +- .../test_threadsafe_function/binding.c | 254 ++++++++++ .../test_threadsafe_function/binding.gyp | 8 + .../test_threadsafe_function/test.js | 166 +++++++ 8 files changed, 1325 insertions(+), 2 deletions(-) create mode 100644 test/addons-napi/test_threadsafe_function/binding.c create mode 100644 test/addons-napi/test_threadsafe_function/binding.gyp create mode 100644 test/addons-napi/test_threadsafe_function/test.js diff --git a/doc/api/errors.md b/doc/api/errors.md index c0773caf7c9..b8cd0b22772 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1371,6 +1371,36 @@ multiple of the element size. While calling `napi_create_typedarray()`, `(length * size_of_element) + byte_offset` was larger than the length of given `buffer`. + +### ERR_NAPI_TSFN_CALL_JS + +An error occurred while invoking the JavaScript portion of the thread-safe +function. + + +### ERR_NAPI_TSFN_GET_UNDEFINED + +An error occurred while attempting to retrieve the JavaScript `undefined` +value. + + +### ERR_NAPI_TSFN_REMOVE_ENV_CLEANUP + +An error occured while attempting to remove the environment cleanup handler. + + +### ERR_NAPI_TSFN_START_IDLE_LOOP + +On the main thread, values are removed from the queue associated with the +thread-safe function in an idle loop. This error indicates that an error +has occurred when attemping to start the loop. + + +### ERR_NAPI_TSFN_STOP_IDLE_LOOP + +Once no more items are left in the queue, the idle loop must be suspended. This +error indicates that the idle loop has failed to stop. + ### ERR_NO_CRYPTO diff --git a/doc/api/n-api.md b/doc/api/n-api.md index ef2396409b4..8b4cd3a8474 100644 --- a/doc/api/n-api.md +++ b/doc/api/n-api.md @@ -75,7 +75,9 @@ typedef enum { napi_cancelled, napi_escape_called_twice, napi_handle_scope_mismatch, - napi_callback_scope_mismatch + napi_callback_scope_mismatch, + napi_queue_full, + napi_closing, } napi_status; ``` If additional information is required upon an API returning a failed status, @@ -113,6 +115,43 @@ not allowed. ### napi_value This is an opaque pointer that is used to represent a JavaScript value. +### napi_threadsafe_function + +> Stability: 1 - Experimental + +This is an opaque pointer that represents a JavaScript function which can be +called asynchronously from multiple threads via +`napi_call_threadsafe_function()`. + +### napi_threadsafe_function_release_mode + +> Stability: 1 - Experimental + +A value to be given to `napi_release_threadsafe_function()` to indicate whether +the thread-safe function is to be closed immediately (`napi_tsfn_abort`) or +merely released (`napi_tsfn_release`) and thus available for subsequent use via +`napi_acquire_threadsafe_function()` and `napi_call_threadsafe_function()`. +```C +typedef enum { + napi_tsfn_release, + napi_tsfn_abort +} napi_threadsafe_function_release_mode; +``` + +### napi_threadsafe_function_call_mode + +> Stability: 1 - Experimental + +A value to be given to `napi_call_threadsafe_function()` to indicate whether +the call should block whenever the queue associated with the thread-safe +function is full. +```C +typedef enum { + napi_tsfn_nonblocking, + napi_tsfn_blocking +} napi_threadsafe_function_call_mode; +``` + ### N-API Memory Management types #### napi_handle_scope This is an abstraction used to control and modify the lifetime of objects @@ -194,6 +233,43 @@ typedef void (*napi_async_complete_callback)(napi_env env, void* data); ``` +#### napi_threadsafe_function_call_js + +> Stability: 1 - Experimental + +Function pointer used with asynchronous thread-safe function calls. The callback +will be called on the main thread. Its purpose is to use a data item arriving +via the queue from one of the secondary threads to construct the parameters +necessary for a call into JavaScript, usually via `napi_call_function`, and then +make the call into JavaScript. + +The data arriving from the secondary thread via the queue is given in the `data` +parameter and the JavaScript function to call is given in the `js_callback` +parameter. + +N-API sets up the environment prior to calling this callback, so it is +sufficient to call the JavaScript function via `napi_call_function` rather than +via `napi_make_callback`. + +Callback functions must satisfy the following signature: +```C +typedef void (*napi_threadsafe_function_call_js)(napi_env env, + napi_value js_callback, + void* context, + void* data); +``` +- `[in] env`: The environment to use for API calls, or `NULL` if the thread-safe +function is being torn down and `data` may need to be freed. +- `[in] js_callback`: The JavaScript function to call, or `NULL` if the +thread-safe function is being torn down and `data` may need to be freed. +- `[in] context`: The optional data with which the thread-safe function was +created. +- `[in] data`: Data created by the secondary thread. It is the responsibility of +the callback to convert this native data to JavaScript values (with N-API +functions) that can be passed as parameters when `js_callback` is invoked. This +pointer is managed entirely by the threads and this callback. Thus this callback +should free the data. + ## Error Handling N-API uses both return values and JavaScript exceptions for error handling. The following sections explain the approach for each case. @@ -3852,6 +3928,296 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, - `[in] env`: The environment that the API is invoked under. - `[out] loop`: The current libuv loop instance. + + +## Asynchronous Thread-safe Function Calls + +> Stability: 1 - Experimental + +JavaScript functions can normally only be called from a native addon's main +thread. If an addon creates additional threads, then N-API functions that +require a `napi_env`, `napi_value`, or `napi_ref` must not be called from those +threads. + +When an addon has additional threads and JavaScript functions need to be invoked +based on the processing completed by those threads, those threads must +communicate with the addon's main thread so that the main thread can invoke the +JavaScript function on their behalf. The thread-safe function APIs provide an +easy way to do this. + +These APIs provide the type `napi_threadsafe_function` as well as APIs to +create, destroy, and call objects of this type. +`napi_create_threadsafe_function()` creates a persistent reference to a +`napi_value` that holds a JavaScript function which can be called from multiple +threads. The calls happen asynchronously. This means that values with which the +JavaScript callback is to be called will be placed in a queue, and, for each +value in the queue, a call will eventually be made to the JavaScript function. + +Upon creation of a `napi_threadsafe_function` a `napi_finalize` callback can be +provided. This callback will be invoked on the main thread when the thread-safe +function is about to be destroyed. It receives the context and the finalize data +given during construction, and provides an opportunity for cleaning up after the +threads e.g. by calling `uv_thread_join()`. **It is important that, aside from +the main loop thread, there be no threads left using the thread-safe function +after the finalize callback completes.** + +The `context` given during the call to `napi_create_threadsafe_function()` can +be retrieved from any thread with a call to +`napi_get_threadsafe_function_context()`. + +`napi_call_threadsafe_function()` can then be used for initiating a call into +JavaScript. `napi_call_threadsafe_function()` accepts a parameter which controls +whether the API behaves blockingly. If set to `napi_tsfn_nonblocking`, the API +behaves non-blockingly, returning `napi_queue_full` if the queue was full, +preventing data from being successfully added to the queue. If set to +`napi_tsfn_blocking`, the API blocks until space becomes available in the queue. +`napi_call_threadsafe_function()` never blocks if the thread-safe function was +created with a maximum queue size of 0. + +The actual call into JavaScript is controlled by the callback given via the +`call_js_cb` parameter. `call_js_cb` is invoked on the main thread once for each +value that was placed into the queue by a successful call to +`napi_call_threadsafe_function()`. If such a callback is not given, a default +callback will be used, and the resulting JavaScript call will have no arguments. +The `call_js_cb` callback receives the JavaScript function to call as a +`napi_value` in its parameters, as well as the `void*` context pointer used when +creating the `napi_threadsafe_function`, and the next data pointer that was +created by one of the secondary threads. The callback can then use an API such +as `napi_call_function()` to call into JavaScript. + +The callback may also be invoked with `env` and `call_js_cb` both set to `NULL` +to indicate that calls into JavaScript are no longer possible, while items +remain in the queue that may need to be freed. This normally occurs when the +Node.js process exits while there is a thread-safe function still active. + +It is not necessary to call into JavaScript via `napi_make_callback()` because +N-API runs `call_js_cb` in a context appropriate for callbacks. + +Threads can be added to and removed from a `napi_threadsafe_function` object +during its existence. Thus, in addition to specifying an initial number of +threads upon creation, `napi_acquire_threadsafe_function` can be called to +indicate that a new thread will start making use of the thread-safe function. +Similarly, `napi_release_threadsafe_function` can be called to indicate that an +existing thread will stop making use of the thread-safe function. + +`napi_threadsafe_function` objects are destroyed when every thread which uses +the object has called `napi_release_threadsafe_function()` or has received a +return status of `napi_closing` in response to a call to +`napi_call_threadsafe_function`. The queue is emptied before the +`napi_threadsafe_function` is destroyed. It is important that +`napi_release_threadsafe_function()` be the last API call made in conjunction +with a given `napi_threadsafe_function`, because after the call completes, there +is no guarantee that the `napi_threadsafe_function` is still allocated. For the +same reason it is also important that no more use be made of a thread-safe +function after receiving a return value of `napi_closing` in response to a call +to `napi_call_threadsafe_function`. Data associated with the +`napi_threadsafe_function` can be freed in its `napi_finalize` callback which +was passed to `napi_create_threadsafe_function()`. + +Once the reference count of a `napi_threadsafe_function` reaches zero, it can no +longer be incremented. In fact, all APIs associated with it will return an error +value of `napi_closing`. + +The thread-safe function can be "aborted" by giving a value of `napi_tsfn_abort` +to `napi_release_threadsafe_function()`. This will cause all subsequent APIs +associated with the thread-safe function except +`napi_release_threadsafe_function()` to return `napi_closing` even before its +reference count reaches zero. In particular, `napi_call_threadsafe_function()` +will return `napi_closing`, thus informing the threads that it is no longer +possible to make asynchronous calls to the thread-safe function. This can be +used as a criterion for terminating the thread. **Upon receiving a return value +of `napi_closing` from `napi_call_threadsafe_function()` a thread must make no +further use of the thread-safe function because it is no longer guaranteed to +be allocated.** + +Similarly to libuv handles, thread-safe functions can be "referenced" and +"unreferenced". A "referenced" thread-safe function will cause the event loop on +the thread on which it is created to remain alive until the thread-safe function +is destroyed. In contrast, an "unreferenced" thread-safe function will not +prevent the event loop from exiting. The APIs `napi_ref_threadsafe_function` and +`napi_unref_threadsafe_function` exist for this purpose. + +### napi_create_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The JavaScript function to call from another thread. +- `[in] async_resource`: An optional object associated with the async work that +will be passed to possible `async_hooks` [`init` hooks][]. +- `[in] async_resource_name`: A javaScript string to provide an identifier for +the kind of resource that is being provided for diagnostic information exposed +by the `async_hooks` API. +- `[in] max_queue_size`: Maximum size of the queue. 0 for no limit. +- `[in] initial_thread_count`: The initial number of threads, including the main +thread, which will be making use of this function. +- `[in] thread_finalize_data`: Data to be passed to `thread_finalize_cb`. +- `[in] thread_finalize_cb`: Function to call when the +`napi_threadsafe_function` is being destroyed. +- `[in] context`: Optional data to attach to the resulting +`napi_threadsafe_function`. +- `[in] call_js_cb`: Optional callback which calls the JavaScript function in +response to a call on a different thread. This callback will be called on the +main thread. If not given, the JavaScript function will be called with no +parameters and with `undefined` as its `this` value. +- `[out] result`: The asynchronous thread-safe JavaScript function. + +### napi_get_threadsafe_function_context + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result); +``` + +- `[in] func`: The thread-safe function for which to retrieve the context. +- `[out] context`: The location where to store the context. + +This API may be called from any thread which makes use of `func`. + +### napi_call_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function to invoke. +- `[in] data`: Data to send into JavaScript via the callback `call_js_cb` +provided during the creation of the thread-safe JavaScript function. +- `[in] is_blocking`: Flag whose value can be either `napi_tsfn_blocking` to +indicate that the call should block if the queue is full or +`napi_tsfn_nonblocking` to indicate that the call should return immediately with +a status of `napi_queue_full` whenever the queue is full. + +This API will return `napi_closing` if `napi_release_threadsafe_function()` was +called with `abort` set to `napi_tsfn_abort` from any thread. The value is only +added to the queue if the API returns `napi_ok`. + +This API may be called from any thread which makes use of `func`. + +### napi_acquire_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function to start making +use of. + +A thread should call this API before passing `func` to any other thread-safe +function APIs to indicate that it will be making use of `func`. This prevents +`func` from being destroyed when all other threads have stopped making use of +it. + +This API may be called from any thread which will start making use of `func`. + +### napi_release_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function whose reference +count to decrement. +- `[in] mode`: Flag whose value can be either `napi_tsfn_release` to indicate +that the current thread will make no further calls to the thread-safe function, +or `napi_tsfn_abort` to indicate that in addition to the current thread, no +other thread should make any further calls to the thread-safe function. If set +to `napi_tsfn_abort`, further calls to `napi_call_threadsafe_function()` will +return `napi_closing`, and no further values will be placed in the queue. + +A thread should call this API when it stops making use of `func`. Passing `func` +to any thread-safe APIs after having called this API has undefined results, as +`func` may have been destroyed. + +This API may be called from any thread which will stop making use of `func`. + +### napi_ref_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The thread-safe function to reference. + +This API is used to indicate that the event loop running on the main thread +should not exit until `func` has been destroyed. Similar to [`uv_ref`][] it is +also idempotent. + +This API may only be called from the main thread. + +### napi_unref_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The thread-safe function to unreference. + +This API is used to indicate that the event loop running on the main thread +may exit before `func` is destroyed. Similar to [`uv_unref`][] it is also +idempotent. + +This API may only be called from the main thread. + [ECMAScript Language Specification]: https://tc39.github.io/ecma262/ [Error Handling]: #n_api_error_handling [Native Abstractions for Node.js]: https://github.com/nodejs/nan @@ -3914,6 +4280,8 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, [`napi_throw_type_error`]: #n_api_napi_throw_type_error [`napi_unwrap`]: #n_api_napi_unwrap [`napi_wrap`]: #n_api_napi_wrap +[`uv_ref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_ref +[`uv_unref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_unref [`process.release`]: process.html#process_process_release [`init` hooks]: async_hooks.html#async_hooks_init_asyncid_type_triggerasyncid_resource diff --git a/src/node_api.cc b/src/node_api.cc index b1b498958b8..f66c5e17a5b 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -5,6 +5,7 @@ #include #include #include +#define NAPI_EXPERIMENTAL #include "node_api.h" #include "node_internals.h" #include "env.h" @@ -3553,3 +3554,435 @@ napi_status napi_run_script(napi_env env, *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); return GET_RETURN_STATUS(env); } + +class TsFn: public node::AsyncResource { + public: + TsFn(v8::Local func, + v8::Local resource, + v8::Local name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + idle_running(false), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + ~TsFn() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + // These methods can be called from any thread. + + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } + } + + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); + + if (is_closing) { + return napi_closing; + } + + thread_count++; + + return napi_ok; + } + + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + if (thread_count == 0) { + return napi_invalid_arg; + } + + thread_count--; + + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } + + return napi_ok; + } + + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } + + // These methods must only be called from the loop thread. + + napi_status Init() { + TsFn* ts_fn = this; + + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } + + node::Environment::GetCurrent(env->isolate)->CloseHandle( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + delete ts_fn; + }); + + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } + + delete ts_fn; + + return napi_generic_failure; + } + + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); + uv_unref(reinterpret_cast(&idle)); + + return napi_ok; + } + + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); + uv_ref(reinterpret_cast(&idle)); + + return napi_ok; + } + + void DispatchOne() { + void* data; + bool popped_value = false; + bool idle_stop_failed = false; + + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + cond->Signal(lock); + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } else { + idle_running = false; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local js_cb = + v8::Local::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (!idle_running) { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + // Default way of calling into JavaScript. Used when TsFn is constructed + // without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (env && cb) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast(data)->CloseHandlesAndMaybeDelete(true); + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + cond->Signal(lock); + } + if (handles_closing) { + return; + } + handles_closing = true; + NodeEnv()->CloseHandle( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + ts_fn->NodeEnv()->CloseHandle( + reinterpret_cast(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::idle, + reinterpret_cast(handle)); + ts_fn->Finalize(); + }); + }); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + node::Persistent ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + bool idle_running; + napi_async_context async_context; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + CHECK_ENV(env); + CHECK_ARG(env, func); + CHECK_ARG(env, async_resource_name); + RETURN_STATUS_IF_FALSE(env, initial_thread_count > 0, napi_invalid_arg); + CHECK_ARG(env, result); + + napi_status status = napi_ok; + + v8::Local v8_func; + CHECK_TO_FUNCTION(env, v8_func, func); + + v8::Local v8_context = env->isolate->GetCurrentContext(); + + v8::Local v8_resource; + if (async_resource == nullptr) { + v8_resource = v8::Object::New(env->isolate); + } else { + CHECK_TO_OBJECT(env, v8_context, v8_resource, async_resource); + } + + v8::Local v8_name; + CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); + + TsFn* ts_fn = new TsFn(v8_func, + v8_resource, + v8_name, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); + + if (ts_fn == nullptr) { + status = napi_generic_failure; + } else { + // Init deletes ts_fn upon failure. + status = ts_fn->Init(); + if (status == napi_ok) { + *result = reinterpret_cast(ts_fn); + } + } + + return napi_set_last_error(env, status); +} + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result) { + CHECK(func != nullptr); + CHECK(result != nullptr); + + *result = reinterpret_cast(func)->Context(); + return napi_ok; +} + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Push(data, is_blocking); +} + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Acquire(); +} + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Release(mode); +} + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Unref(); +} + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Ref(); +} diff --git a/src/node_api.h b/src/node_api.h index 91c2775a03e..84706ac3ed6 100644 --- a/src/node_api.h +++ b/src/node_api.h @@ -614,6 +614,44 @@ NAPI_EXTERN napi_status napi_run_script(napi_env env, NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, struct uv_loop_s** loop); +#ifdef NAPI_EXPERIMENTAL +// Calling into JS from other threads +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result); + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode); + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +#endif // NAPI_EXPERIMENTAL EXTERN_C_END #endif // SRC_NODE_API_H_ diff --git a/src/node_api_types.h b/src/node_api_types.h index f7f3ee62755..af7d7c7f953 100644 --- a/src/node_api_types.h +++ b/src/node_api_types.h @@ -20,6 +20,9 @@ typedef struct napi_callback_info__* napi_callback_info; typedef struct napi_async_context__* napi_async_context; typedef struct napi_async_work__* napi_async_work; typedef struct napi_deferred__* napi_deferred; +#ifdef NAPI_EXPERIMENTAL +typedef struct napi_threadsafe_function__* napi_threadsafe_function; +#endif // NAPI_EXPERIMENTAL typedef enum { napi_default = 0, @@ -72,9 +75,25 @@ typedef enum { napi_cancelled, napi_escape_called_twice, napi_handle_scope_mismatch, - napi_callback_scope_mismatch + napi_callback_scope_mismatch, +#ifdef NAPI_EXPERIMENTAL + napi_queue_full, + napi_closing, +#endif // NAPI_EXPERIMENTAL } napi_status; +#ifdef NAPI_EXPERIMENTAL +typedef enum { + napi_tsfn_release, + napi_tsfn_abort +} napi_threadsafe_function_release_mode; + +typedef enum { + napi_tsfn_nonblocking, + napi_tsfn_blocking +} napi_threadsafe_function_call_mode; +#endif // NAPI_EXPERIMENTAL + typedef napi_value (*napi_callback)(napi_env env, napi_callback_info info); typedef void (*napi_finalize)(napi_env env, @@ -86,6 +105,13 @@ typedef void (*napi_async_complete_callback)(napi_env env, napi_status status, void* data); +#ifdef NAPI_EXPERIMENTAL +typedef void (*napi_threadsafe_function_call_js)(napi_env env, + napi_value js_callback, + void* context, + void* data); +#endif // NAPI_EXPERIMENTAL + typedef struct { // One of utf8name or name should be NULL. const char* utf8name; diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c new file mode 100644 index 00000000000..929f5a1b2f9 --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.c @@ -0,0 +1,254 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include +#define NAPI_EXPERIMENTAL +#include +#include "../common.h" + +#define ARRAY_LENGTH 10 + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + napi_threadsafe_function ts_fn = data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + napi_threadsafe_function ts_fn = data; + int index; + void* hint; + ts_fn_hint *ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint *)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + if (!ts_fn_info->block_on_full && !queue_was_full) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never full", NAPI_AUTO_LENGTH); + } + + // Assert that the queue was marked as closing at least once, if this is an + // aborting test run. + if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never closing", NAPI_AUTO_LENGTH); + } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (cb != NULL) { + napi_value argv, undefined; + NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv, + NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NAPI_CALL(env, napi_typeof(env, argv[0], &value_type)); + NAPI_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NAPI_CALL(env, + napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + NAPI_CALL(env, + napi_release_threadsafe_function(ts_fn, + abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void *data, void *hint) { + uv_thread_t *the_threads = data; + ts_fn_hint *the_hint = hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NAPI_CALL_RETURN_VOID(env, + napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, + napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, + the_hint->js_finalize_cb)); +} + +static napi_value StartThreadInternal(napi_env env, + napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full) { + size_t argc = 3; + napi_value argv[3]; + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_value async_name; + NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name, + 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn)); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NAPI_ASSERT(env, + (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, true); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, true); +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + + napi_property_descriptor properties[] = { + { + "ARRAY_LENGTH", + NULL, + NULL, + NULL, + NULL, + js_array_length, + napi_enumerable, + NULL + }, + DECLARE_NAPI_PROPERTY("StartThread", StartThread), + DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), + DECLARE_NAPI_PROPERTY("StopThread", StopThread), + DECLARE_NAPI_PROPERTY("Unref", Unref), + DECLARE_NAPI_PROPERTY("Release", Release), + }; + + NAPI_CALL(env, napi_define_properties(env, exports, + sizeof(properties)/sizeof(properties[0]), properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) diff --git a/test/addons-napi/test_threadsafe_function/binding.gyp b/test/addons-napi/test_threadsafe_function/binding.gyp new file mode 100644 index 00000000000..b60352e05af --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.gyp @@ -0,0 +1,8 @@ +{ + 'targets': [ + { + 'target_name': 'binding', + 'sources': ['binding.c'] + } + ] +} diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js new file mode 100644 index 00000000000..c82b4992fdf --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/test.js @@ -0,0 +1,166 @@ +'use strict'; + +const common = require('../../common'); +const assert = require('assert'); +const binding = require(`./build/${common.buildType}/binding`); +const { fork } = require('child_process'); +const expectedArray = (function(arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +common.crashOnUnhandledRejection(); + +// Handle the rapid teardown test case as the child process. We unref the +// thread-safe function after we have received two values. This causes the +// process to exit and the environment cleanup handler to be invoked. +if (process.argv[2] === 'child') { + let callCount = 0; + binding.StartThread((value) => { + callCount++; + console.log(value); + if (callCount === 2) { + binding.Unref(); + } + }, false /* abort */, true /* launchSecondary */); + + // Release the thread-safe function from the main thread so that it may be + // torn down via the environment cleanup handler. + binding.Release(); + return; +} + +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary); + if (threadStarter === 'StartThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); +} + +new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */); +}) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 5 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 5 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in non-blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start a child process to test rapid teardown +.then(() => { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child'], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }); +}) +.then((result) => assert.strictEqual(result.indexOf(0), -1));