From 2e153724e5a6363917d5ca1fa8fc3810865d819d Mon Sep 17 00:00:00 2001 From: Gabriel Schulhof Date: Sun, 17 Jun 2018 11:52:49 -0400 Subject: [PATCH 01/10] n-api: add API for asynchronous functions Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`, and a `v8::Persistent` to make it possible to call into JS from another thread. The API accepts a void data pointer and a callback which will be invoked on the loop thread and which will receive the `napi_value` representing the JavaScript function to call so as to perform the call into JS. The callback is run inside a `node::CallbackScope`. A `std::queue` is used to store calls from the secondary threads, and an idle loop is started by the `uv_async_t` callback on the loop thread to drain the queue, calling into JS with each item. Items can be added to the queue blockingly or non-blockingly. The thread-safe function can be referenced or unreferenced, with the same semantics as libuv handles. Re: https://github.com/nodejs/help/issues/1035 Re: https://github.com/nodejs/node/issues/20964 Fixes: https://github.com/nodejs/node/issues/13512 PR-URL: https://github.com/nodejs/node/pull/17887 Reviewed-By: Matteo Collina Reviewed-By: Michael Dawson --- doc/api/errors.md | 25 + doc/api/n-api.md | 372 ++++++++++++++- src/node_api.cc | 443 +++++++++++++++++- src/node_api.h | 38 ++ src/node_api_types.h | 28 +- .../test_threadsafe_function/binding.c | 254 ++++++++++ .../test_threadsafe_function/binding.gyp | 8 + .../test_threadsafe_function/test.js | 166 +++++++ 8 files changed, 1330 insertions(+), 4 deletions(-) create mode 100644 test/addons-napi/test_threadsafe_function/binding.c create mode 100644 test/addons-napi/test_threadsafe_function/binding.gyp create mode 100644 test/addons-napi/test_threadsafe_function/test.js diff --git a/doc/api/errors.md b/doc/api/errors.md index 53ef8395ac7065..590c504ae68edf 100755 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1107,6 +1107,31 @@ multiple of the element size. While calling `napi_create_typedarray()`, `(length * size_of_element) + byte_offset` was larger than the length of given `buffer`. + +### ERR_NAPI_TSFN_CALL_JS + +An error occurred while invoking the JavaScript portion of the thread-safe +function. + + +### ERR_NAPI_TSFN_GET_UNDEFINED + +An error occurred while attempting to retrieve the JavaScript `undefined` +value. + + +### ERR_NAPI_TSFN_START_IDLE_LOOP + +On the main thread, values are removed from the queue associated with the +thread-safe function in an idle loop. This error indicates that an error +has occurred when attemping to start the loop. + + +### ERR_NAPI_TSFN_STOP_IDLE_LOOP + +Once no more items are left in the queue, the idle loop must be suspended. This +error indicates that the idle loop has failed to stop. + ### ERR_NO_ICU diff --git a/doc/api/n-api.md b/doc/api/n-api.md index 1e43e4f224d9f0..7e7ed9efd3e88a 100644 --- a/doc/api/n-api.md +++ b/doc/api/n-api.md @@ -90,7 +90,11 @@ typedef enum { napi_cancelled, napi_escape_called_twice, napi_handle_scope_mismatch, - napi_callback_scope_mismatch + napi_callback_scope_mismatch, +#ifdef NAPI_EXPERIMENTAL + napi_queue_full, + napi_closing, +#endif // NAPI_EXPERIMENTAL } napi_status; ``` If additional information is required upon an API returning a failed status, @@ -128,6 +132,43 @@ not allowed. ### napi_value This is an opaque pointer that is used to represent a JavaScript value. +### napi_threadsafe_function + +> Stability: 1 - Experimental + +This is an opaque pointer that represents a JavaScript function which can be +called asynchronously from multiple threads via +`napi_call_threadsafe_function()`. + +### napi_threadsafe_function_release_mode + +> Stability: 1 - Experimental + +A value to be given to `napi_release_threadsafe_function()` to indicate whether +the thread-safe function is to be closed immediately (`napi_tsfn_abort`) or +merely released (`napi_tsfn_release`) and thus available for subsequent use via +`napi_acquire_threadsafe_function()` and `napi_call_threadsafe_function()`. +```C +typedef enum { + napi_tsfn_release, + napi_tsfn_abort +} napi_threadsafe_function_release_mode; +``` + +### napi_threadsafe_function_call_mode + +> Stability: 1 - Experimental + +A value to be given to `napi_call_threadsafe_function()` to indicate whether +the call should block whenever the queue associated with the thread-safe +function is full. +```C +typedef enum { + napi_tsfn_nonblocking, + napi_tsfn_blocking +} napi_threadsafe_function_call_mode; +``` + ### N-API Memory Management types #### napi_handle_scope This is an abstraction used to control and modify the lifetime of objects @@ -205,6 +246,43 @@ typedef void (*napi_async_complete_callback)(napi_env env, void* data); ``` +#### napi_threadsafe_function_call_js + +> Stability: 1 - Experimental + +Function pointer used with asynchronous thread-safe function calls. The callback +will be called on the main thread. Its purpose is to use a data item arriving +via the queue from one of the secondary threads to construct the parameters +necessary for a call into JavaScript, usually via `napi_call_function`, and then +make the call into JavaScript. + +The data arriving from the secondary thread via the queue is given in the `data` +parameter and the JavaScript function to call is given in the `js_callback` +parameter. + +N-API sets up the environment prior to calling this callback, so it is +sufficient to call the JavaScript function via `napi_call_function` rather than +via `napi_make_callback`. + +Callback functions must satisfy the following signature: +```C +typedef void (*napi_threadsafe_function_call_js)(napi_env env, + napi_value js_callback, + void* context, + void* data); +``` +- `[in] env`: The environment to use for API calls, or `NULL` if the thread-safe +function is being torn down and `data` may need to be freed. +- `[in] js_callback`: The JavaScript function to call, or `NULL` if the +thread-safe function is being torn down and `data` may need to be freed. +- `[in] context`: The optional data with which the thread-safe function was +created. +- `[in] data`: Data created by the secondary thread. It is the responsibility of +the callback to convert this native data to JavaScript values (with N-API +functions) that can be passed as parameters when `js_callback` is invoked. This +pointer is managed entirely by the threads and this callback. Thus this callback +should free the data. + ## Error Handling N-API uses both return values and JavaScript exceptions for error handling. The following sections explain the approach for each case. @@ -3837,6 +3915,296 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, - `[in] env`: The environment that the API is invoked under. - `[out] loop`: The current libuv loop instance. +## Asynchronous Thread-safe Function Calls + +> Stability: 1 - Experimental + +JavaScript functions can normally only be called from a native addon's main +thread. If an addon creates additional threads, then N-API functions that +require a `napi_env`, `napi_value`, or `napi_ref` must not be called from those +threads. + +When an addon has additional threads and JavaScript functions need to be invoked +based on the processing completed by those threads, those threads must +communicate with the addon's main thread so that the main thread can invoke the +JavaScript function on their behalf. The thread-safe function APIs provide an +easy way to do this. + +These APIs provide the type `napi_threadsafe_function` as well as APIs to +create, destroy, and call objects of this type. +`napi_create_threadsafe_function()` creates a persistent reference to a +`napi_value` that holds a JavaScript function which can be called from multiple +threads. The calls happen asynchronously. This means that values with which the +JavaScript callback is to be called will be placed in a queue, and, for each +value in the queue, a call will eventually be made to the JavaScript function. + +Upon creation of a `napi_threadsafe_function` a `napi_finalize` callback can be +provided. This callback will be invoked on the main thread when the thread-safe +function is about to be destroyed. It receives the context and the finalize data +given during construction, and provides an opportunity for cleaning up after the +threads e.g. by calling `uv_thread_join()`. **It is important that, aside from +the main loop thread, there be no threads left using the thread-safe function +after the finalize callback completes.** + +The `context` given during the call to `napi_create_threadsafe_function()` can +be retrieved from any thread with a call to +`napi_get_threadsafe_function_context()`. + +`napi_call_threadsafe_function()` can then be used for initiating a call into +JavaScript. `napi_call_threadsafe_function()` accepts a parameter which controls +whether the API behaves blockingly. If set to `napi_tsfn_nonblocking`, the API +behaves non-blockingly, returning `napi_queue_full` if the queue was full, +preventing data from being successfully added to the queue. If set to +`napi_tsfn_blocking`, the API blocks until space becomes available in the queue. +`napi_call_threadsafe_function()` never blocks if the thread-safe function was +created with a maximum queue size of 0. + +The actual call into JavaScript is controlled by the callback given via the +`call_js_cb` parameter. `call_js_cb` is invoked on the main thread once for each +value that was placed into the queue by a successful call to +`napi_call_threadsafe_function()`. If such a callback is not given, a default +callback will be used, and the resulting JavaScript call will have no arguments. +The `call_js_cb` callback receives the JavaScript function to call as a +`napi_value` in its parameters, as well as the `void*` context pointer used when +creating the `napi_threadsafe_function`, and the next data pointer that was +created by one of the secondary threads. The callback can then use an API such +as `napi_call_function()` to call into JavaScript. + +The callback may also be invoked with `env` and `call_js_cb` both set to `NULL` +to indicate that calls into JavaScript are no longer possible, while items +remain in the queue that may need to be freed. This normally occurs when the +Node.js process exits while there is a thread-safe function still active. + +It is not necessary to call into JavaScript via `napi_make_callback()` because +N-API runs `call_js_cb` in a context appropriate for callbacks. + +Threads can be added to and removed from a `napi_threadsafe_function` object +during its existence. Thus, in addition to specifying an initial number of +threads upon creation, `napi_acquire_threadsafe_function` can be called to +indicate that a new thread will start making use of the thread-safe function. +Similarly, `napi_release_threadsafe_function` can be called to indicate that an +existing thread will stop making use of the thread-safe function. + +`napi_threadsafe_function` objects are destroyed when every thread which uses +the object has called `napi_release_threadsafe_function()` or has received a +return status of `napi_closing` in response to a call to +`napi_call_threadsafe_function`. The queue is emptied before the +`napi_threadsafe_function` is destroyed. It is important that +`napi_release_threadsafe_function()` be the last API call made in conjunction +with a given `napi_threadsafe_function`, because after the call completes, there +is no guarantee that the `napi_threadsafe_function` is still allocated. For the +same reason it is also important that no more use be made of a thread-safe +function after receiving a return value of `napi_closing` in response to a call +to `napi_call_threadsafe_function`. Data associated with the +`napi_threadsafe_function` can be freed in its `napi_finalize` callback which +was passed to `napi_create_threadsafe_function()`. + +Once the number of threads making use of a `napi_threadsafe_function` reaches +zero, no further threads can start making use of it by calling +`napi_acquire_threadsafe_function()`. In fact, all subsequent API calls +associated with it, except `napi_release_threadsafe_function()`, will return an +error value of `napi_closing`. + +The thread-safe function can be "aborted" by giving a value of `napi_tsfn_abort` +to `napi_release_threadsafe_function()`. This will cause all subsequent APIs +associated with the thread-safe function except +`napi_release_threadsafe_function()` to return `napi_closing` even before its +reference count reaches zero. In particular, `napi_call_threadsafe_function()` +will return `napi_closing`, thus informing the threads that it is no longer +possible to make asynchronous calls to the thread-safe function. This can be +used as a criterion for terminating the thread. **Upon receiving a return value +of `napi_closing` from `napi_call_threadsafe_function()` a thread must make no +further use of the thread-safe function because it is no longer guaranteed to +be allocated.** + +Similarly to libuv handles, thread-safe functions can be "referenced" and +"unreferenced". A "referenced" thread-safe function will cause the event loop on +the thread on which it is created to remain alive until the thread-safe function +is destroyed. In contrast, an "unreferenced" thread-safe function will not +prevent the event loop from exiting. The APIs `napi_ref_threadsafe_function` and +`napi_unref_threadsafe_function` exist for this purpose. + +### napi_create_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The JavaScript function to call from another thread. +- `[in] async_resource`: An optional object associated with the async work that +will be passed to possible `async_hooks` [`init` hooks][]. +- `[in] async_resource_name`: A javaScript string to provide an identifier for +the kind of resource that is being provided for diagnostic information exposed +by the `async_hooks` API. +- `[in] max_queue_size`: Maximum size of the queue. 0 for no limit. +- `[in] initial_thread_count`: The initial number of threads, including the main +thread, which will be making use of this function. +- `[in] thread_finalize_data`: Data to be passed to `thread_finalize_cb`. +- `[in] thread_finalize_cb`: Function to call when the +`napi_threadsafe_function` is being destroyed. +- `[in] context`: Optional data to attach to the resulting +`napi_threadsafe_function`. +- `[in] call_js_cb`: Optional callback which calls the JavaScript function in +response to a call on a different thread. This callback will be called on the +main thread. If not given, the JavaScript function will be called with no +parameters and with `undefined` as its `this` value. +- `[out] result`: The asynchronous thread-safe JavaScript function. + +### napi_get_threadsafe_function_context + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result); +``` + +- `[in] func`: The thread-safe function for which to retrieve the context. +- `[out] context`: The location where to store the context. + +This API may be called from any thread which makes use of `func`. + +### napi_call_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function to invoke. +- `[in] data`: Data to send into JavaScript via the callback `call_js_cb` +provided during the creation of the thread-safe JavaScript function. +- `[in] is_blocking`: Flag whose value can be either `napi_tsfn_blocking` to +indicate that the call should block if the queue is full or +`napi_tsfn_nonblocking` to indicate that the call should return immediately with +a status of `napi_queue_full` whenever the queue is full. + +This API will return `napi_closing` if `napi_release_threadsafe_function()` was +called with `abort` set to `napi_tsfn_abort` from any thread. The value is only +added to the queue if the API returns `napi_ok`. + +This API may be called from any thread which makes use of `func`. + +### napi_acquire_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function to start making +use of. + +A thread should call this API before passing `func` to any other thread-safe +function APIs to indicate that it will be making use of `func`. This prevents +`func` from being destroyed when all other threads have stopped making use of +it. + +This API may be called from any thread which will start making use of `func`. + +### napi_release_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode); +``` + +- `[in] func`: The asynchronous thread-safe JavaScript function whose reference +count to decrement. +- `[in] mode`: Flag whose value can be either `napi_tsfn_release` to indicate +that the current thread will make no further calls to the thread-safe function, +or `napi_tsfn_abort` to indicate that in addition to the current thread, no +other thread should make any further calls to the thread-safe function. If set +to `napi_tsfn_abort`, further calls to `napi_call_threadsafe_function()` will +return `napi_closing`, and no further values will be placed in the queue. + +A thread should call this API when it stops making use of `func`. Passing `func` +to any thread-safe APIs after having called this API has undefined results, as +`func` may have been destroyed. + +This API may be called from any thread which will stop making use of `func`. + +### napi_ref_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The thread-safe function to reference. + +This API is used to indicate that the event loop running on the main thread +should not exit until `func` has been destroyed. Similar to [`uv_ref`][] it is +also idempotent. + +This API may only be called from the main thread. + +### napi_unref_threadsafe_function + +> Stability: 1 - Experimental + + +```C +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func); +``` + +- `[in] env`: The environment that the API is invoked under. +- `[in] func`: The thread-safe function to unreference. + +This API is used to indicate that the event loop running on the main thread +may exit before `func` is destroyed. Similar to [`uv_unref`][] it is also +idempotent. + +This API may only be called from the main thread. + [Basic N-API Data Types]: #n_api_basic_n_api_data_types [Custom Asynchronous Operations]: #n_api_custom_asynchronous_operations [ECMAScript Language Specification]: https://tc39.github.io/ecma262/ @@ -3899,5 +4267,7 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, [`napi_throw`]: #n_api_napi_throw [`napi_unwrap`]: #n_api_napi_unwrap [`napi_wrap`]: #n_api_napi_wrap +[`uv_ref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_ref +[`uv_unref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_unref [`process.release`]: process.html#process_process_release [async_hooks `type`]: async_hooks.html#async_hooks_type diff --git a/src/node_api.cc b/src/node_api.cc index 61e9f6b4b00279..20fb2f8c9a0af8 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -5,6 +5,7 @@ #include #include #include +#define NAPI_EXPERIMENTAL #include "node_api.h" #include "node_internals.h" #include "env.h" @@ -947,7 +948,10 @@ const char* error_messages[] = {nullptr, "The async work item was cancelled", "napi_escape_handle already called on scope", "Invalid handle scope usage", - "Invalid callback scope usage"}; + "Invalid callback scope usage", + "Thread-safe function queue is full", + "Thread-safe function handle is closing" +}; static inline napi_status napi_clear_last_error(napi_env env) { env->last_error.error_code = napi_ok; @@ -978,7 +982,7 @@ napi_status napi_get_last_error_info(napi_env env, // We don't have a napi_status_last as this would result in an ABI // change each time a message was added. static_assert( - node::arraysize(error_messages) == napi_callback_scope_mismatch + 1, + node::arraysize(error_messages) == napi_closing + 1, "Count of error messages must match count of error values"); CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); @@ -3587,3 +3591,438 @@ napi_status napi_run_script(napi_env env, *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); return GET_RETURN_STATUS(env); } + +class TsFn: public node::AsyncResource { + public: + TsFn(v8::Local func, + v8::Local resource, + v8::Local name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + idle_running(false), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + ~TsFn() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + if (ref.IsEmpty()) + return; + ref.ClearWeak(); + ref.Reset(); + } + + // These methods can be called from any thread. + + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } + } + + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); + + if (is_closing) { + return napi_closing; + } + + thread_count++; + + return napi_ok; + } + + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + if (thread_count == 0) { + return napi_invalid_arg; + } + + thread_count--; + + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } + + return napi_ok; + } + + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } + + // These methods must only be called from the loop thread. + + napi_status Init() { + TsFn* ts_fn = this; + + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } + + uv_close(reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + delete ts_fn; + }); + + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } + + delete ts_fn; + + return napi_generic_failure; + } + + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); + uv_unref(reinterpret_cast(&idle)); + + return napi_ok; + } + + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); + uv_ref(reinterpret_cast(&idle)); + + return napi_ok; + } + + void DispatchOne() { + void* data; + bool popped_value = false; + bool idle_stop_failed = false; + + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + cond->Signal(lock); + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } else { + idle_running = false; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local js_cb = + v8::Local::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (!idle_running) { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + cond->Signal(lock); + } + if (handles_closing) { + return; + } + handles_closing = true; + uv_close( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + uv_close( + reinterpret_cast(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::idle, + reinterpret_cast(handle)); + ts_fn->Finalize(); + }); + }); + } + + // Default way of calling into JavaScript. Used when TsFn is constructed + // without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (!(env == nullptr || cb == nullptr)) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast(data)->CloseHandlesAndMaybeDelete(true); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + v8::Persistent ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + bool idle_running; + napi_async_context async_context; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + CHECK_ENV(env); + CHECK_ARG(env, func); + CHECK_ARG(env, async_resource_name); + RETURN_STATUS_IF_FALSE(env, initial_thread_count > 0, napi_invalid_arg); + CHECK_ARG(env, result); + + napi_status status = napi_ok; + + v8::Local v8_func; + CHECK_TO_FUNCTION(env, v8_func, func); + + v8::Local v8_context = env->isolate->GetCurrentContext(); + + v8::Local v8_resource; + if (async_resource == nullptr) { + v8_resource = v8::Object::New(env->isolate); + } else { + CHECK_TO_OBJECT(env, v8_context, v8_resource, async_resource); + } + + v8::Local v8_name; + CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); + + TsFn* ts_fn = new TsFn(v8_func, + v8_resource, + v8_name, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); + + if (ts_fn == nullptr) { + status = napi_generic_failure; + } else { + // Init deletes ts_fn upon failure. + status = ts_fn->Init(); + if (status == napi_ok) { + *result = reinterpret_cast(ts_fn); + } + } + + return napi_set_last_error(env, status); +} + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result) { + CHECK(func != nullptr); + CHECK(result != nullptr); + + *result = reinterpret_cast(func)->Context(); + return napi_ok; +} + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Push(data, is_blocking); +} + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Acquire(); +} + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Release(mode); +} + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Unref(); +} + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Ref(); +} diff --git a/src/node_api.h b/src/node_api.h index 6e6c575b76609a..0868eabc5c6961 100644 --- a/src/node_api.h +++ b/src/node_api.h @@ -595,6 +595,44 @@ NAPI_EXTERN napi_status napi_run_script(napi_env env, NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, struct uv_loop_s** loop); +#ifdef NAPI_EXPERIMENTAL +// Calling into JS from other threads +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result); + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode); + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +#endif // NAPI_EXPERIMENTAL EXTERN_C_END #endif // SRC_NODE_API_H_ diff --git a/src/node_api_types.h b/src/node_api_types.h index 76f38802e83e2e..cfc2b879ab31ef 100644 --- a/src/node_api_types.h +++ b/src/node_api_types.h @@ -20,6 +20,9 @@ typedef struct napi_callback_info__ *napi_callback_info; typedef struct napi_async_context__ *napi_async_context; typedef struct napi_async_work__ *napi_async_work; typedef struct napi_deferred__ *napi_deferred; +#ifdef NAPI_EXPERIMENTAL +typedef struct napi_threadsafe_function__* napi_threadsafe_function; +#endif // NAPI_EXPERIMENTAL typedef enum { napi_default = 0, @@ -72,9 +75,25 @@ typedef enum { napi_cancelled, napi_escape_called_twice, napi_handle_scope_mismatch, - napi_callback_scope_mismatch + napi_callback_scope_mismatch, +#ifdef NAPI_EXPERIMENTAL + napi_queue_full, + napi_closing, +#endif // NAPI_EXPERIMENTAL } napi_status; +#ifdef NAPI_EXPERIMENTAL +typedef enum { + napi_tsfn_release, + napi_tsfn_abort +} napi_threadsafe_function_release_mode; + +typedef enum { + napi_tsfn_nonblocking, + napi_tsfn_blocking +} napi_threadsafe_function_call_mode; +#endif // NAPI_EXPERIMENTAL + typedef napi_value (*napi_callback)(napi_env env, napi_callback_info info); typedef void (*napi_finalize)(napi_env env, @@ -86,6 +105,13 @@ typedef void (*napi_async_complete_callback)(napi_env env, napi_status status, void* data); +#ifdef NAPI_EXPERIMENTAL +typedef void (*napi_threadsafe_function_call_js)(napi_env env, + napi_value js_callback, + void* context, + void* data); +#endif // NAPI_EXPERIMENTAL + typedef struct { // One of utf8name or name should be NULL. const char* utf8name; diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c new file mode 100644 index 00000000000000..551705b1f21074 --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.c @@ -0,0 +1,254 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include +#define NAPI_EXPERIMENTAL +#include +#include "../common.h" + +#define ARRAY_LENGTH 10 + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + napi_threadsafe_function ts_fn = data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + napi_threadsafe_function ts_fn = data; + int index; + void* hint; + ts_fn_hint *ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint *)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + if (!ts_fn_info->block_on_full && !queue_was_full) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never full", NAPI_AUTO_LENGTH); + } + + // Assert that the queue was marked as closing at least once, if this is an + // aborting test run. + if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never closing", NAPI_AUTO_LENGTH); + } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (!(env == NULL || cb == NULL)) { + napi_value argv, undefined; + NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv, + NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NAPI_CALL(env, napi_typeof(env, argv[0], &value_type)); + NAPI_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NAPI_CALL(env, + napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + NAPI_CALL(env, + napi_release_threadsafe_function(ts_fn, + abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void *data, void *hint) { + uv_thread_t *the_threads = data; + ts_fn_hint *the_hint = hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NAPI_CALL_RETURN_VOID(env, + napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, + napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, + the_hint->js_finalize_cb)); +} + +static napi_value StartThreadInternal(napi_env env, + napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full) { + size_t argc = 3; + napi_value argv[3]; + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_value async_name; + NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name, + 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn)); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NAPI_ASSERT(env, + (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, true); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, true); +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + + napi_property_descriptor properties[] = { + { + "ARRAY_LENGTH", + NULL, + NULL, + NULL, + NULL, + js_array_length, + napi_enumerable, + NULL + }, + DECLARE_NAPI_PROPERTY("StartThread", StartThread), + DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), + DECLARE_NAPI_PROPERTY("StopThread", StopThread), + DECLARE_NAPI_PROPERTY("Unref", Unref), + DECLARE_NAPI_PROPERTY("Release", Release), + }; + + NAPI_CALL(env, napi_define_properties(env, exports, + sizeof(properties)/sizeof(properties[0]), properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) diff --git a/test/addons-napi/test_threadsafe_function/binding.gyp b/test/addons-napi/test_threadsafe_function/binding.gyp new file mode 100644 index 00000000000000..b60352e05af103 --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.gyp @@ -0,0 +1,8 @@ +{ + 'targets': [ + { + 'target_name': 'binding', + 'sources': ['binding.c'] + } + ] +} diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js new file mode 100644 index 00000000000000..8d8a6d9d8c6827 --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/test.js @@ -0,0 +1,166 @@ +'use strict'; + +const common = require('../../common'); +const assert = require('assert'); +const binding = require(`./build/${common.buildType}/binding`); +const { fork } = require('child_process'); +const expectedArray = (function(arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +common.crashOnUnhandledRejection(); + +// Handle the rapid teardown test case as the child process. We unref the +// thread-safe function after we have received two values. This causes the +// process to exit and the environment cleanup handler to be invoked. +if (process.argv[2] === 'child') { + let callCount = 0; + binding.StartThread((value) => { + callCount++; + console.log(value); + if (callCount === 2) { + binding.Unref(); + } + }, false /* abort */, true /* launchSecondary */); + + // Release the thread-safe function from the main thread so that it may be + // torn down via the environment cleanup handler. + binding.Release(); + return; +} + +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary); + if (threadStarter === 'StartThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); +} + +new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */); +}) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in non-blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start a child process to test rapid teardown +.then(() => { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child'], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }); +}) +.then((result) => assert.strictEqual(result.indexOf(0), -1)); From b285f1d2267b4ad5f8545da8b0de40eb84943ead Mon Sep 17 00:00:00 2001 From: cjihrig Date: Fri, 29 Jun 2018 19:52:46 -0400 Subject: [PATCH 02/10] n-api: fix compiler warning private field 'async_context' is not used [-Wunused-private-field] PR-URL: https://github.com/nodejs/node/pull/21597 Refs: https://github.com/nodejs/node/pull/17887 Reviewed-By: Gabriel Schulhof Reviewed-By: Daniel Bevenius --- src/node_api.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/node_api.cc b/src/node_api.cc index 20fb2f8c9a0af8..e9d30eeef6454e 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3920,7 +3920,6 @@ class TsFn: public node::AsyncResource { void* finalize_data; napi_finalize finalize_cb; bool idle_running; - napi_async_context async_context; napi_threadsafe_function_call_js call_js_cb; bool handles_closing; }; From 7600c04df2306f881ea96dfe4e31f2e7803a132a Mon Sep 17 00:00:00 2001 From: Gabriel Schulhof Date: Wed, 18 Jul 2018 09:33:51 -0400 Subject: [PATCH 03/10] n-api: guard against cond null dereference A condition variable is only created by the thread-safe function if the queue size is set to something larger than zero. This adds null-checks around the condition variable and tests for the case where the queue size is zero. Fixes: https://github.com/nodejs/help/issues/1387 PR-URL: https://github.com/nodejs/node/pull/21871 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Colin Ihrig Reviewed-By: Michael Dawson --- src/node_api.cc | 10 ++- .../test_threadsafe_function/binding.c | 39 +++++++-- .../test_threadsafe_function/test.js | 84 ++++++++++++++----- 3 files changed, 104 insertions(+), 29 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index e9d30eeef6454e..4628ce079f4513 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3683,7 +3683,7 @@ class TsFn: public node::AsyncResource { if (thread_count == 0 || mode == napi_tsfn_abort) { if (!is_closing) { is_closing = (mode == napi_tsfn_abort); - if (is_closing) { + if (is_closing && max_queue_size > 0) { cond->Signal(lock); } if (uv_async_send(&async) != 0) { @@ -3772,7 +3772,9 @@ class TsFn: public node::AsyncResource { if (size == 0) { if (thread_count == 0) { is_closing = true; - cond->Signal(lock); + if (max_queue_size > 0) { + cond->Signal(lock); + } CloseHandlesAndMaybeDelete(); } else { if (uv_idle_stop(&idle) != 0) { @@ -3839,7 +3841,9 @@ class TsFn: public node::AsyncResource { if (set_closing) { node::Mutex::ScopedLock lock(this->mutex); is_closing = true; - cond->Signal(lock); + if (max_queue_size > 0) { + cond->Signal(lock); + } } if (handles_closing) { return; diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c index 551705b1f21074..354012a288b3ca 100644 --- a/test/addons-napi/test_threadsafe_function/binding.c +++ b/test/addons-napi/test_threadsafe_function/binding.c @@ -9,6 +9,7 @@ #include "../common.h" #define ARRAY_LENGTH 10 +#define MAX_QUEUE_SIZE 2 static uv_thread_t uv_threads[2]; static napi_threadsafe_function ts_fn; @@ -18,6 +19,7 @@ typedef struct { napi_threadsafe_function_release_mode abort; bool start_secondary; napi_ref js_finalize_cb; + uint32_t max_queue_size; } ts_fn_hint; static ts_fn_hint ts_info; @@ -71,6 +73,12 @@ static void data_source_thread(void* data) { for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { status = napi_call_threadsafe_function(ts_fn, &ints[index], ts_fn_info->block_on_full); + if (ts_fn_info->max_queue_size == 0) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); + for (; uv_hrtime() - start < 200000000;); + } switch (status) { case napi_queue_full: queue_was_full = true; @@ -167,8 +175,8 @@ static napi_value StartThreadInternal(napi_env env, napi_callback_info info, napi_threadsafe_function_call_js cb, bool block_on_full) { - size_t argc = 3; - napi_value argv[3]; + size_t argc = 4; + napi_value argv[4]; ts_info.block_on_full = (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); @@ -178,8 +186,18 @@ static napi_value StartThreadInternal(napi_env env, napi_value async_name; NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", NAPI_AUTO_LENGTH, &async_name)); - NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name, - 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn)); + NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size)); + NAPI_CALL(env, napi_create_threadsafe_function(env, + argv[0], + NULL, + async_name, + ts_info.max_queue_size, + 2, + uv_threads, + join_the_threads, + &ts_info, + cb, + &ts_fn)); bool abort; NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; @@ -224,8 +242,9 @@ static napi_value Init(napi_env env, napi_value exports) { for (index = 0; index < ARRAY_LENGTH; index++) { ints[index] = index; } - napi_value js_array_length; + napi_value js_array_length, js_max_queue_size; napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size); napi_property_descriptor properties[] = { { @@ -238,6 +257,16 @@ static napi_value Init(napi_env env, napi_value exports) { napi_enumerable, NULL }, + { + "MAX_QUEUE_SIZE", + NULL, + NULL, + NULL, + NULL, + js_max_queue_size, + napi_enumerable, + NULL + }, DECLARE_NAPI_PROPERTY("StartThread", StartThread), DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js index 8d8a6d9d8c6827..ac2549a8c08e5c 100644 --- a/test/addons-napi/test_threadsafe_function/test.js +++ b/test/addons-napi/test_threadsafe_function/test.js @@ -25,7 +25,7 @@ if (process.argv[2] === 'child') { if (callCount === 2) { binding.Unref(); } - }, false /* abort */, true /* launchSecondary */); + }, false /* abort */, true /* launchSecondary */, +process.argv[3]); // Release the thread-safe function from the main thread so that it may be // torn down via the environment cleanup handler. @@ -37,6 +37,7 @@ function testWithJSMarshaller({ threadStarter, quitAfter, abort, + maxQueueSize, launchSecondary }) { return new Promise((resolve) => { const array = []; @@ -49,7 +50,7 @@ function testWithJSMarshaller({ }), !!abort); }); } - }, !!abort, !!launchSecondary); + }, !!abort, !!launchSecondary, maxQueueSize); if (threadStarter === 'StartThreadNonblocking') { // Let's make this thread really busy for a short while to ensure that // the queue fills and the thread receives a napi_queue_full. @@ -59,6 +60,24 @@ function testWithJSMarshaller({ }); } +function testUnref(queueSize) { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child', queueSize], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }) + .then((result) => assert.strictEqual(result.indexOf(0), -1)); +} + new Promise(function testWithoutJSMarshaller(resolve) { let callCount = 0; binding.StartThreadNoNative(function testCallback() { @@ -73,13 +92,23 @@ new Promise(function testWithoutJSMarshaller(resolve) { }), false); }); } - }, false /* abort */, false /* launchSecondary */); + }, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE); }) // Start the thread in blocking mode, and assert that all values are passed. // Quit after it's done. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, quitAfter: binding.ARRAY_LENGTH })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -88,6 +117,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit after it's done. .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: binding.ARRAY_LENGTH })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -96,6 +126,16 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit early, but let the thread finish. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, quitAfter: 1 })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -104,6 +144,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit early, but let the thread finish. .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: 1 })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -114,6 +155,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -124,15 +166,27 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true })) .then((result) => assert.deepStrictEqual(result, expectedArray)) // Start the thread in blocking mode, and assert that it could not finish. -// Quit early and aborting. +// Quit early by aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in blocking mode with an infinite queue, and assert that it +// could not finish. Quit early by aborting. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', quitAfter: 1, + maxQueueSize: 0, abort: true })) .then((result) => assert.strictEqual(result.indexOf(0), -1)) @@ -142,25 +196,13 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, abort: true })) .then((result) => assert.strictEqual(result.indexOf(0), -1)) // Start a child process to test rapid teardown -.then(() => { - return new Promise((resolve, reject) => { - let output = ''; - const child = fork(__filename, ['child'], { - stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] - }); - child.on('close', (code) => { - if (code === 0) { - resolve(output.match(/\S+/g)); - } else { - reject(new Error('Child process died with code ' + code)); - } - }); - child.stdout.on('data', (data) => (output += data.toString())); - }); -}) -.then((result) => assert.strictEqual(result.indexOf(0), -1)); +.then(() => testUnref(binding.MAX_QUEUE_SIZE)) + +// Start a child process with an infinite queue to test rapid teardown +.then(() => testUnref(0)); From 796576528895acaef13ffe4c5fc24d9127d844b6 Mon Sep 17 00:00:00 2001 From: Michael Dawson Date: Thu, 19 Jul 2018 18:15:01 -0400 Subject: [PATCH 04/10] src: fix may be uninitialized warning in n-api PR-URL: https://github.com/nodejs/node/pull/21898 Reviewed-By: Anna Henningsen Reviewed-By: Colin Ihrig Reviewed-By: James M Snell Reviewed-By: Trivikram Kamat Reviewed-By: Jon Moss Reviewed-By: Luigi Pinca --- src/node_api.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/node_api.cc b/src/node_api.cc index 4628ce079f4513..2421b9c333dbeb 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3749,7 +3749,7 @@ class TsFn: public node::AsyncResource { } void DispatchOne() { - void* data; + void* data = nullptr; bool popped_value = false; bool idle_stop_failed = false; From 46118be491f4f0b859a49dfab965394ccd612d76 Mon Sep 17 00:00:00 2001 From: Lars-Magnus Skog Date: Fri, 27 Jul 2018 16:17:32 +0200 Subject: [PATCH 05/10] n-api: remove idle_running from TsFn The idle_running member variable in TsFn is always false and can therefore be removed. PR-URL: https://github.com/nodejs/node/pull/22520 Reviewed-By: James M Snell Reviewed-By: Anna Henningsen Reviewed-By: Gabriel Schulhof Reviewed-By: Gus Caplan --- src/node_api.cc | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index 2421b9c333dbeb..89a51162d0ff88 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3614,7 +3614,6 @@ class TsFn: public node::AsyncResource { env(env_), finalize_data(finalize_data_), finalize_cb(finalize_cb_), - idle_running(false), call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), handles_closing(false) { ref.Reset(env->isolate, func); @@ -3779,8 +3778,6 @@ class TsFn: public node::AsyncResource { } else { if (uv_idle_stop(&idle) != 0) { idle_stop_failed = true; - } else { - idle_running = false; } } } @@ -3813,14 +3810,12 @@ class TsFn: public node::AsyncResource { } void MaybeStartIdle() { - if (!idle_running) { - if (uv_idle_start(&idle, IdleCb) != 0) { - v8::HandleScope scope(env->isolate); - CallbackScope cb_scope(this); - CHECK(napi_throw_error(env, - "ERR_NAPI_TSFN_START_IDLE_LOOP", - "Failed to start the idle loop") == napi_ok); - } + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); } } @@ -3923,7 +3918,6 @@ class TsFn: public node::AsyncResource { napi_env env; void* finalize_data; napi_finalize finalize_cb; - bool idle_running; napi_threadsafe_function_call_js call_js_cb; bool handles_closing; }; From 35c6b1dfade51780cab1ab5ad97925e415af7d50 Mon Sep 17 00:00:00 2001 From: Gabriel Schulhof Date: Fri, 10 Aug 2018 23:22:34 -0400 Subject: [PATCH 06/10] n-api: clean up thread-safe function * Move class `TsFn` to name space `v8impl` and rename it to `ThreadSafeFunction` * Remove `NAPI_EXTERN` from API declarations, because it's only needed in the header file. PR-URL: https://github.com/nodejs/node/pull/22259 Reviewed-By: Anna Henningsen Reviewed-By: Kyle Farnung Reviewed-By: Michael Dawson --- src/node_api.cc | 1179 ++++++++++++++++++++++++----------------------- 1 file changed, 592 insertions(+), 587 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index 89a51162d0ff88..a4408b70f24c10 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -856,180 +856,513 @@ napi_status ConcludeDeferred(napi_env env, return GET_RETURN_STATUS(env); } -} // end of namespace v8impl - -// Intercepts the Node-V8 module registration callback. Converts parameters -// to NAPI equivalents and then calls the registration callback specified -// by the NAPI module. -void napi_module_register_cb(v8::Local exports, - v8::Local module, - v8::Local context, - void* priv) { - napi_module* mod = static_cast(priv); +class ThreadSafeFunction : public node::AsyncResource { + public: + ThreadSafeFunction(v8::Local func, + v8::Local resource, + v8::Local name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } - if (mod->nm_register_func == nullptr) { - node::Environment::GetCurrent(context)->ThrowError( - "Module has no declared entry point."); - return; + ~ThreadSafeFunction() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + if (ref.IsEmpty()) + return; + ref.ClearWeak(); + ref.Reset(); } - // Create a new napi_env for this module or reference one if a pre-existing - // one is found. - napi_env env = v8impl::GetEnv(context); + // These methods can be called from any thread. - napi_value _exports; - NAPI_CALL_INTO_MODULE_THROW(env, - _exports = mod->nm_register_func(env, - v8impl::JsValueFromV8LocalValue(exports))); + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); - // If register function returned a non-null exports object different from - // the exports object we passed it, set that as the "exports" property of - // the module. - if (_exports != nullptr && - _exports != v8impl::JsValueFromV8LocalValue(exports)) { - napi_value _module = v8impl::JsValueFromV8LocalValue(module); - napi_set_named_property(env, _module, "exports", _exports); + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } } -} -} // end of anonymous namespace + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); -// Registers a NAPI module. -void napi_module_register(napi_module* mod) { - node::node_module* nm = new node::node_module { - -1, - mod->nm_flags, - nullptr, - mod->nm_filename, - nullptr, - napi_module_register_cb, - mod->nm_modname, - mod, // priv - nullptr, - }; - node::node_module_register(nm); -} + if (is_closing) { + return napi_closing; + } -napi_status napi_add_env_cleanup_hook(napi_env env, - void (*fun)(void* arg), - void* arg) { - CHECK_ENV(env); - CHECK_ARG(env, fun); + thread_count++; - node::AddEnvironmentCleanupHook(env->isolate, fun, arg); + return napi_ok; + } - return napi_ok; -} + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); -napi_status napi_remove_env_cleanup_hook(napi_env env, - void (*fun)(void* arg), - void* arg) { - CHECK_ENV(env); - CHECK_ARG(env, fun); + if (thread_count == 0) { + return napi_invalid_arg; + } - node::RemoveEnvironmentCleanupHook(env->isolate, fun, arg); + thread_count--; - return napi_ok; -} + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing && max_queue_size > 0) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } -// Warning: Keep in-sync with napi_status enum -static -const char* error_messages[] = {nullptr, - "Invalid argument", - "An object was expected", - "A string was expected", - "A string or symbol was expected", - "A function was expected", - "A number was expected", - "A boolean was expected", - "An array was expected", - "Unknown failure", - "An exception is pending", - "The async work item was cancelled", - "napi_escape_handle already called on scope", - "Invalid handle scope usage", - "Invalid callback scope usage", - "Thread-safe function queue is full", - "Thread-safe function handle is closing" -}; + return napi_ok; + } -static inline napi_status napi_clear_last_error(napi_env env) { - env->last_error.error_code = napi_ok; + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } - // TODO(boingoing): Should this be a callback? - env->last_error.engine_error_code = 0; - env->last_error.engine_reserved = nullptr; - return napi_ok; -} + // These methods must only be called from the loop thread. -static inline -napi_status napi_set_last_error(napi_env env, napi_status error_code, - uint32_t engine_error_code, - void* engine_reserved) { - env->last_error.error_code = error_code; - env->last_error.engine_error_code = engine_error_code; - env->last_error.engine_reserved = engine_reserved; - return error_code; -} + napi_status Init() { + ThreadSafeFunction* ts_fn = this; -napi_status napi_get_last_error_info(napi_env env, - const napi_extended_error_info** result) { - CHECK_ENV(env); - CHECK_ARG(env, result); + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } - // you must update this assert to reference the last message - // in the napi_status enum each time a new error message is added. - // We don't have a napi_status_last as this would result in an ABI - // change each time a message was added. - static_assert( - node::arraysize(error_messages) == napi_closing + 1, - "Count of error messages must match count of error values"); - CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); + uv_close(reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, + reinterpret_cast(handle)); + delete ts_fn; + }); - // Wait until someone requests the last error information to fetch the error - // message string - env->last_error.error_message = - error_messages[env->last_error.error_code]; + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } - *result = &(env->last_error); - return napi_ok; -} + delete ts_fn; -napi_status napi_fatal_exception(napi_env env, napi_value err) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, err); + return napi_generic_failure; + } - v8::Local local_err = v8impl::V8LocalValueFromJsValue(err); - v8impl::trigger_fatal_exception(env, local_err); + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); + uv_unref(reinterpret_cast(&idle)); - return napi_clear_last_error(env); -} + return napi_ok; + } -NAPI_NO_RETURN void napi_fatal_error(const char* location, - size_t location_len, - const char* message, - size_t message_len) { - std::string location_string; - std::string message_string; + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); + uv_ref(reinterpret_cast(&idle)); - if (location_len != NAPI_AUTO_LENGTH) { - location_string.assign( - const_cast(location), location_len); - } else { - location_string.assign( - const_cast(location), strlen(location)); + return napi_ok; } - if (message_len != NAPI_AUTO_LENGTH) { - message_string.assign( - const_cast(message), message_len); - } else { - message_string.assign( - const_cast(message), strlen(message)); - } + void DispatchOne() { + void* data = nullptr; + bool popped_value = false; + bool idle_stop_failed = false; - node::FatalError(location_string.c_str(), message_string.c_str()); -} + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + if (max_queue_size > 0) { + cond->Signal(lock); + } + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local js_cb = + v8::Local::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + if (max_queue_size > 0) { + cond->Signal(lock); + } + } + if (handles_closing) { + return; + } + handles_closing = true; + uv_close( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, + reinterpret_cast(handle)); + uv_close( + reinterpret_cast(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::idle, + reinterpret_cast(handle)); + ts_fn->Finalize(); + }); + }); + } + + // Default way of calling into JavaScript. Used when ThreadSafeFunction is + // constructed without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (!(env == nullptr || cb == nullptr)) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast(data) + ->CloseHandlesAndMaybeDelete(true); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + v8::Persistent ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +} // end of namespace v8impl + +// Intercepts the Node-V8 module registration callback. Converts parameters +// to NAPI equivalents and then calls the registration callback specified +// by the NAPI module. +void napi_module_register_cb(v8::Local exports, + v8::Local module, + v8::Local context, + void* priv) { + napi_module* mod = static_cast(priv); + + if (mod->nm_register_func == nullptr) { + node::Environment::GetCurrent(context)->ThrowError( + "Module has no declared entry point."); + return; + } + + // Create a new napi_env for this module or reference one if a pre-existing + // one is found. + napi_env env = v8impl::GetEnv(context); + + napi_value _exports; + NAPI_CALL_INTO_MODULE_THROW(env, + _exports = mod->nm_register_func(env, + v8impl::JsValueFromV8LocalValue(exports))); + + // If register function returned a non-null exports object different from + // the exports object we passed it, set that as the "exports" property of + // the module. + if (_exports != nullptr && + _exports != v8impl::JsValueFromV8LocalValue(exports)) { + napi_value _module = v8impl::JsValueFromV8LocalValue(module); + napi_set_named_property(env, _module, "exports", _exports); + } +} + +} // end of anonymous namespace + +// Registers a NAPI module. +void napi_module_register(napi_module* mod) { + node::node_module* nm = new node::node_module { + -1, + mod->nm_flags, + nullptr, + mod->nm_filename, + nullptr, + napi_module_register_cb, + mod->nm_modname, + mod, // priv + nullptr, + }; + node::node_module_register(nm); +} + +napi_status napi_add_env_cleanup_hook(napi_env env, + void (*fun)(void* arg), + void* arg) { + CHECK_ENV(env); + CHECK_ARG(env, fun); + + node::AddEnvironmentCleanupHook(env->isolate, fun, arg); + + return napi_ok; +} + +napi_status napi_remove_env_cleanup_hook(napi_env env, + void (*fun)(void* arg), + void* arg) { + CHECK_ENV(env); + CHECK_ARG(env, fun); + + node::RemoveEnvironmentCleanupHook(env->isolate, fun, arg); + + return napi_ok; +} + +// Warning: Keep in-sync with napi_status enum +static +const char* error_messages[] = {nullptr, + "Invalid argument", + "An object was expected", + "A string was expected", + "A string or symbol was expected", + "A function was expected", + "A number was expected", + "A boolean was expected", + "An array was expected", + "Unknown failure", + "An exception is pending", + "The async work item was cancelled", + "napi_escape_handle already called on scope", + "Invalid handle scope usage", + "Invalid callback scope usage", + "Thread-safe function queue is full", + "Thread-safe function handle is closing" +}; + +static inline napi_status napi_clear_last_error(napi_env env) { + env->last_error.error_code = napi_ok; + + // TODO(boingoing): Should this be a callback? + env->last_error.engine_error_code = 0; + env->last_error.engine_reserved = nullptr; + return napi_ok; +} + +static inline +napi_status napi_set_last_error(napi_env env, napi_status error_code, + uint32_t engine_error_code, + void* engine_reserved) { + env->last_error.error_code = error_code; + env->last_error.engine_error_code = engine_error_code; + env->last_error.engine_reserved = engine_reserved; + return error_code; +} + +napi_status napi_get_last_error_info(napi_env env, + const napi_extended_error_info** result) { + CHECK_ENV(env); + CHECK_ARG(env, result); + + // you must update this assert to reference the last message + // in the napi_status enum each time a new error message is added. + // We don't have a napi_status_last as this would result in an ABI + // change each time a message was added. + static_assert( + node::arraysize(error_messages) == napi_closing + 1, + "Count of error messages must match count of error values"); + CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); + + // Wait until someone requests the last error information to fetch the error + // message string + env->last_error.error_message = + error_messages[env->last_error.error_code]; + + *result = &(env->last_error); + return napi_ok; +} + +napi_status napi_fatal_exception(napi_env env, napi_value err) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, err); + + v8::Local local_err = v8impl::V8LocalValueFromJsValue(err); + v8impl::trigger_fatal_exception(env, local_err); + + return napi_clear_last_error(env); +} + +NAPI_NO_RETURN void napi_fatal_error(const char* location, + size_t location_len, + const char* message, + size_t message_len) { + std::string location_string; + std::string message_string; + + if (location_len != NAPI_AUTO_LENGTH) { + location_string.assign( + const_cast(location), location_len); + } else { + location_string.assign( + const_cast(location), strlen(location)); + } + + if (message_len != NAPI_AUTO_LENGTH) { + message_string.assign( + const_cast(message), message_len); + } else { + message_string.assign( + const_cast(message), strlen(message)); + } + + node::FatalError(location_string.c_str(), message_string.c_str()); +} napi_status napi_create_function(napi_env env, const char* utf8name, @@ -3473,456 +3806,126 @@ napi_status napi_create_async_work(napi_env env, *result = reinterpret_cast(work); return napi_clear_last_error(env); -} - -napi_status napi_delete_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - uvimpl::Work::Delete(reinterpret_cast(work)); - - return napi_clear_last_error(env); -} - -napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { - CHECK_ENV(env); - CHECK_ARG(env, loop); - *loop = env->loop; - return napi_clear_last_error(env); -} - -napi_status napi_queue_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - napi_status status; - uv_loop_t* event_loop = nullptr; - status = napi_get_uv_event_loop(env, &event_loop); - if (status != napi_ok) - return napi_set_last_error(env, status); - - uvimpl::Work* w = reinterpret_cast(work); - - CALL_UV(env, uv_queue_work(event_loop, - w->Request(), - uvimpl::Work::ExecuteCallback, - uvimpl::Work::CompleteCallback)); - - return napi_clear_last_error(env); -} - -napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - uvimpl::Work* w = reinterpret_cast(work); - - CALL_UV(env, uv_cancel(reinterpret_cast(w->Request()))); - - return napi_clear_last_error(env); -} - -napi_status napi_create_promise(napi_env env, - napi_deferred* deferred, - napi_value* promise) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, deferred); - CHECK_ARG(env, promise); - - auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); - CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); - - auto v8_resolver = maybe.ToLocalChecked(); - auto v8_deferred = new v8::Persistent(); - v8_deferred->Reset(env->isolate, v8_resolver); - - *deferred = v8impl::JsDeferredFromV8Persistent(v8_deferred); - *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); - return GET_RETURN_STATUS(env); -} - -napi_status napi_resolve_deferred(napi_env env, - napi_deferred deferred, - napi_value resolution) { - return v8impl::ConcludeDeferred(env, deferred, resolution, true); -} - -napi_status napi_reject_deferred(napi_env env, - napi_deferred deferred, - napi_value resolution) { - return v8impl::ConcludeDeferred(env, deferred, resolution, false); -} - -napi_status napi_is_promise(napi_env env, - napi_value promise, - bool* is_promise) { - CHECK_ENV(env); - CHECK_ARG(env, promise); - CHECK_ARG(env, is_promise); - - *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); - - return napi_clear_last_error(env); -} - -napi_status napi_run_script(napi_env env, - napi_value script, - napi_value* result) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, script); - CHECK_ARG(env, result); - - v8::Local v8_script = v8impl::V8LocalValueFromJsValue(script); - - if (!v8_script->IsString()) { - return napi_set_last_error(env, napi_string_expected); - } - - v8::Local context = env->isolate->GetCurrentContext(); - - auto maybe_script = v8::Script::Compile(context, - v8::Local::Cast(v8_script)); - CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); - - auto script_result = - maybe_script.ToLocalChecked()->Run(context); - CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); - - *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); - return GET_RETURN_STATUS(env); -} - -class TsFn: public node::AsyncResource { - public: - TsFn(v8::Local func, - v8::Local resource, - v8::Local name, - size_t thread_count_, - void* context_, - size_t max_queue_size_, - napi_env env_, - void* finalize_data_, - napi_finalize finalize_cb_, - napi_threadsafe_function_call_js call_js_cb_): - AsyncResource(env_->isolate, - resource, - *v8::String::Utf8Value(env_->isolate, name)), - thread_count(thread_count_), - is_closing(false), - context(context_), - max_queue_size(max_queue_size_), - env(env_), - finalize_data(finalize_data_), - finalize_cb(finalize_cb_), - call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), - handles_closing(false) { - ref.Reset(env->isolate, func); - node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); - } - - ~TsFn() { - node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); - if (ref.IsEmpty()) - return; - ref.ClearWeak(); - ref.Reset(); - } - - // These methods can be called from any thread. - - napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { - node::Mutex::ScopedLock lock(this->mutex); - - while (queue.size() >= max_queue_size && - max_queue_size > 0 && - !is_closing) { - if (mode == napi_tsfn_nonblocking) { - return napi_queue_full; - } - cond->Wait(lock); - } - - if (is_closing) { - if (thread_count == 0) { - return napi_invalid_arg; - } else { - thread_count--; - return napi_closing; - } - } else { - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } - queue.push(data); - return napi_ok; - } - } - - napi_status Acquire() { - node::Mutex::ScopedLock lock(this->mutex); - - if (is_closing) { - return napi_closing; - } - - thread_count++; - - return napi_ok; - } - - napi_status Release(napi_threadsafe_function_release_mode mode) { - node::Mutex::ScopedLock lock(this->mutex); - - if (thread_count == 0) { - return napi_invalid_arg; - } - - thread_count--; - - if (thread_count == 0 || mode == napi_tsfn_abort) { - if (!is_closing) { - is_closing = (mode == napi_tsfn_abort); - if (is_closing && max_queue_size > 0) { - cond->Signal(lock); - } - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } - } - } - - return napi_ok; - } - - void EmptyQueueAndDelete() { - for (; !queue.empty() ; queue.pop()) { - call_js_cb(nullptr, nullptr, context, queue.front()); - } - delete this; - } - - // These methods must only be called from the loop thread. - - napi_status Init() { - TsFn* ts_fn = this; - - if (uv_async_init(env->loop, &async, AsyncCb) == 0) { - if (max_queue_size > 0) { - cond.reset(new node::ConditionVariable); - } - if ((max_queue_size == 0 || cond.get() != nullptr) && - uv_idle_init(env->loop, &idle) == 0) { - return napi_ok; - } +} - uv_close(reinterpret_cast(&async), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = - node::ContainerOf(&TsFn::async, - reinterpret_cast(handle)); - delete ts_fn; - }); +napi_status napi_delete_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - // Prevent the thread-safe function from being deleted here, because - // the callback above will delete it. - ts_fn = nullptr; - } + uvimpl::Work::Delete(reinterpret_cast(work)); - delete ts_fn; + return napi_clear_last_error(env); +} - return napi_generic_failure; - } +napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { + CHECK_ENV(env); + CHECK_ARG(env, loop); + *loop = env->loop; + return napi_clear_last_error(env); +} - napi_status Unref() { - uv_unref(reinterpret_cast(&async)); - uv_unref(reinterpret_cast(&idle)); +napi_status napi_queue_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - return napi_ok; - } + napi_status status; + uv_loop_t* event_loop = nullptr; + status = napi_get_uv_event_loop(env, &event_loop); + if (status != napi_ok) + return napi_set_last_error(env, status); - napi_status Ref() { - uv_ref(reinterpret_cast(&async)); - uv_ref(reinterpret_cast(&idle)); + uvimpl::Work* w = reinterpret_cast(work); - return napi_ok; - } + CALL_UV(env, uv_queue_work(event_loop, + w->Request(), + uvimpl::Work::ExecuteCallback, + uvimpl::Work::CompleteCallback)); - void DispatchOne() { - void* data = nullptr; - bool popped_value = false; - bool idle_stop_failed = false; + return napi_clear_last_error(env); +} - { - node::Mutex::ScopedLock lock(this->mutex); - if (is_closing) { - CloseHandlesAndMaybeDelete(); - } else { - size_t size = queue.size(); - if (size > 0) { - data = queue.front(); - queue.pop(); - popped_value = true; - if (size == max_queue_size && max_queue_size > 0) { - cond->Signal(lock); - } - size--; - } +napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - if (size == 0) { - if (thread_count == 0) { - is_closing = true; - if (max_queue_size > 0) { - cond->Signal(lock); - } - CloseHandlesAndMaybeDelete(); - } else { - if (uv_idle_stop(&idle) != 0) { - idle_stop_failed = true; - } - } - } - } - } + uvimpl::Work* w = reinterpret_cast(work); - if (popped_value || idle_stop_failed) { - v8::HandleScope scope(env->isolate); - CallbackScope cb_scope(this); + CALL_UV(env, uv_cancel(reinterpret_cast(w->Request()))); - if (idle_stop_failed) { - CHECK(napi_throw_error(env, - "ERR_NAPI_TSFN_STOP_IDLE_LOOP", - "Failed to stop the idle loop") == napi_ok); - } else { - v8::Local js_cb = - v8::Local::New(env->isolate, ref); - call_js_cb(env, - v8impl::JsValueFromV8LocalValue(js_cb), - context, - data); - } - } - } + return napi_clear_last_error(env); +} - node::Environment* NodeEnv() { - // For some reason grabbing the Node.js environment requires a handle scope. - v8::HandleScope scope(env->isolate); - return node::Environment::GetCurrent(env->isolate); - } +napi_status napi_create_promise(napi_env env, + napi_deferred* deferred, + napi_value* promise) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, deferred); + CHECK_ARG(env, promise); - void MaybeStartIdle() { - if (uv_idle_start(&idle, IdleCb) != 0) { - v8::HandleScope scope(env->isolate); - CallbackScope cb_scope(this); - CHECK(napi_throw_error(env, - "ERR_NAPI_TSFN_START_IDLE_LOOP", - "Failed to start the idle loop") == napi_ok); - } - } + auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); + CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); - void Finalize() { - v8::HandleScope scope(env->isolate); - if (finalize_cb) { - CallbackScope cb_scope(this); - finalize_cb(env, finalize_data, context); - } - EmptyQueueAndDelete(); - } + auto v8_resolver = maybe.ToLocalChecked(); + auto v8_deferred = new v8::Persistent(); + v8_deferred->Reset(env->isolate, v8_resolver); - inline void* Context() { - return context; - } + *deferred = v8impl::JsDeferredFromV8Persistent(v8_deferred); + *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); + return GET_RETURN_STATUS(env); +} - void CloseHandlesAndMaybeDelete(bool set_closing = false) { - if (set_closing) { - node::Mutex::ScopedLock lock(this->mutex); - is_closing = true; - if (max_queue_size > 0) { - cond->Signal(lock); - } - } - if (handles_closing) { - return; - } - handles_closing = true; - uv_close( - reinterpret_cast(&async), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = node::ContainerOf(&TsFn::async, - reinterpret_cast(handle)); - uv_close( - reinterpret_cast(&ts_fn->idle), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = node::ContainerOf(&TsFn::idle, - reinterpret_cast(handle)); - ts_fn->Finalize(); - }); - }); - } +napi_status napi_resolve_deferred(napi_env env, + napi_deferred deferred, + napi_value resolution) { + return v8impl::ConcludeDeferred(env, deferred, resolution, true); +} - // Default way of calling into JavaScript. Used when TsFn is constructed - // without a call_js_cb_. - static void CallJs(napi_env env, napi_value cb, void* context, void* data) { - if (!(env == nullptr || cb == nullptr)) { - napi_value recv; - napi_status status; +napi_status napi_reject_deferred(napi_env env, + napi_deferred deferred, + napi_value resolution) { + return v8impl::ConcludeDeferred(env, deferred, resolution, false); +} - status = napi_get_undefined(env, &recv); - if (status != napi_ok) { - napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", - "Failed to retrieve undefined value"); - return; - } +napi_status napi_is_promise(napi_env env, + napi_value promise, + bool* is_promise) { + CHECK_ENV(env); + CHECK_ARG(env, promise); + CHECK_ARG(env, is_promise); - status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); - if (status != napi_ok && status != napi_pending_exception) { - napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", - "Failed to call JS callback"); - return; - } - } - } + *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); - static void IdleCb(uv_idle_t* idle) { - TsFn* ts_fn = - node::ContainerOf(&TsFn::idle, idle); - ts_fn->DispatchOne(); - } + return napi_clear_last_error(env); +} - static void AsyncCb(uv_async_t* async) { - TsFn* ts_fn = - node::ContainerOf(&TsFn::async, async); - ts_fn->MaybeStartIdle(); - } +napi_status napi_run_script(napi_env env, + napi_value script, + napi_value* result) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, script); + CHECK_ARG(env, result); - static void Cleanup(void* data) { - reinterpret_cast(data)->CloseHandlesAndMaybeDelete(true); + v8::Local v8_script = v8impl::V8LocalValueFromJsValue(script); + + if (!v8_script->IsString()) { + return napi_set_last_error(env, napi_string_expected); } - private: - // These are variables protected by the mutex. - node::Mutex mutex; - std::unique_ptr cond; - std::queue queue; - uv_async_t async; - uv_idle_t idle; - size_t thread_count; - bool is_closing; + v8::Local context = env->isolate->GetCurrentContext(); - // These are variables set once, upon creation, and then never again, which - // means we don't need the mutex to read them. - void* context; - size_t max_queue_size; + auto maybe_script = v8::Script::Compile(context, + v8::Local::Cast(v8_script)); + CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); - // These are variables accessed only from the loop thread. - v8::Persistent ref; - napi_env env; - void* finalize_data; - napi_finalize finalize_cb; - napi_threadsafe_function_call_js call_js_cb; - bool handles_closing; -}; + auto script_result = + maybe_script.ToLocalChecked()->Run(context); + CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); + + *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); + return GET_RETURN_STATUS(env); +} -NAPI_EXTERN napi_status +napi_status napi_create_threadsafe_function(napi_env env, napi_value func, napi_value async_resource, @@ -3957,16 +3960,17 @@ napi_create_threadsafe_function(napi_env env, v8::Local v8_name; CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); - TsFn* ts_fn = new TsFn(v8_func, - v8_resource, - v8_name, - initial_thread_count, - context, - max_queue_size, - env, - thread_finalize_data, - thread_finalize_cb, - call_js_cb); + v8impl::ThreadSafeFunction* ts_fn = + new v8impl::ThreadSafeFunction(v8_func, + v8_resource, + v8_name, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); if (ts_fn == nullptr) { status = napi_generic_failure; @@ -3981,45 +3985,46 @@ napi_create_threadsafe_function(napi_env env, return napi_set_last_error(env, status); } -NAPI_EXTERN napi_status +napi_status napi_get_threadsafe_function_context(napi_threadsafe_function func, void** result) { CHECK(func != nullptr); CHECK(result != nullptr); - *result = reinterpret_cast(func)->Context(); + *result = reinterpret_cast(func)->Context(); return napi_ok; } -NAPI_EXTERN napi_status +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, void* data, napi_threadsafe_function_call_mode is_blocking) { CHECK(func != nullptr); - return reinterpret_cast(func)->Push(data, is_blocking); + return reinterpret_cast(func)->Push(data, + is_blocking); } -NAPI_EXTERN napi_status +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Acquire(); + return reinterpret_cast(func)->Acquire(); } -NAPI_EXTERN napi_status +napi_status napi_release_threadsafe_function(napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) { CHECK(func != nullptr); - return reinterpret_cast(func)->Release(mode); + return reinterpret_cast(func)->Release(mode); } -NAPI_EXTERN napi_status +napi_status napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Unref(); + return reinterpret_cast(func)->Unref(); } -NAPI_EXTERN napi_status +napi_status napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Ref(); + return reinterpret_cast(func)->Ref(); } From 8efb65bdbf472add5f4d1da91bc9ce1f5aed9f4d Mon Sep 17 00:00:00 2001 From: Lars-Magnus Skog Date: Fri, 21 Sep 2018 16:17:38 +0200 Subject: [PATCH 07/10] doc: fix optional parameters in n-api.md The thread_finalize_data and thread_finalize_cb parameters in napi_create_threadsafe_function are optional. PR-URL: https://github.com/nodejs/node/pull/22998 Reviewed-By: James M Snell Reviewed-By: Anna Henningsen --- doc/api/n-api.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/n-api.md b/doc/api/n-api.md index 7e7ed9efd3e88a..91368267c42663 100644 --- a/doc/api/n-api.md +++ b/doc/api/n-api.md @@ -4056,8 +4056,8 @@ by the `async_hooks` API. - `[in] max_queue_size`: Maximum size of the queue. 0 for no limit. - `[in] initial_thread_count`: The initial number of threads, including the main thread, which will be making use of this function. -- `[in] thread_finalize_data`: Data to be passed to `thread_finalize_cb`. -- `[in] thread_finalize_cb`: Function to call when the +- `[in] thread_finalize_data`: Optional data to be passed to `thread_finalize_cb`. +- `[in] thread_finalize_cb`: Optional function to call when the `napi_threadsafe_function` is being destroyed. - `[in] context`: Optional data to attach to the resulting `napi_threadsafe_function`. From f116dc4b2ac9074f748192d3fe3b72833ab76092 Mon Sep 17 00:00:00 2001 From: Daniel Bevenius Date: Wed, 31 Oct 2018 15:41:32 +0100 Subject: [PATCH 08/10] n-api: add missing handle scopes Currently when building with --debug test/addons-napi/test_threadsafe_function will error: $ out/Debug/node test/addons-napi/test_threadsafe_function/test.js FATAL ERROR: v8::HandleScope::CreateHandle() Cannot create a handle without a HandleScope 1: 0x10004e287 node::DumpBacktrace(__sFILE*) [node/out/Debug/node] 2: 0x1000cd37b node::Abort() [/node/out/Debug/node] 3: 0x1000cd69f node::OnFatalError(char const*, char const*) [/node/out/Debug/node] 4: 0x1004df0b1 v8::Utils::ReportApiFailure(char const*, char const*) [/nodejs/node/out/Debug/node] 5: 0x100a8c0a9 v8::internal::HandleScope::Extend( v8::internal::Isolate*) [/node/out/Debug/node] 6: 0x1004e4229 v8::EmbedderDataFor(v8::Context*, int, bool, char const*) [/node/out/Debug/node] 7: 0x1004e43fa v8::Context::SlowGetAlignedPointerFromEmbedderData(int) [/node/out/Debug/node] 8: 0x10001c26b v8::Context::GetAlignedPointerFromEmbedderData(int) [/node/out/Debug/node] 9: 0x1000144ea node::Environment::GetCurrent(v8::Local) [/node/out/Debug/node] 10: 0x1000f49e2 napi_env__::node_env() const [/node/out/Debug/node] 11: 0x1000f9885 (anonymous namespace)::v8impl::ThreadSafeFunction:: CloseHandlesAndMaybeDelete(bool) [/node/out/Debug/node] 12: 0x1000fb34f (anonymous namespace)::v8impl::ThreadSafeFunction:: DispatchOne() [/node/out/Debug/node] 13: 0x1000fb129 (anonymous namespace)::v8impl::ThreadSafeFunction:: IdleCb(uv_idle_s*) [/node/out/Debug/node] 14: 0x1011a1b69 uv__run_idle [/node/out/Debug/node] 15: 0x101198179 uv_run [/node/out/Debug/node] 16: 0x1000dfca1 node::Start(...) [/node/out/Debug/node] 17: 0x1000dae50 node::Start(...) [/node/out/Debug/node] 18: 0x1000da56f node::Start(int, char**) [/node/out/Debug/node] 19: 0x10141112e main [/node/out/Debug/node] 20: 0x100001034 start [/node/out/Debug/node] Abort trap: 6 This commit adds two HandleScope's, one to CloseHandlesAndMaybeDelete and one to the lambda. SlowGetAlignedPointerFromEmbedderData will only be called for debug builds: https://github.com/v8/v8/blob/2ef0aa662fe907a1b36ac1abe7d77ad2bcd27733 /include/v8.h#L10440-L10447 PR-URL: https://github.com/nodejs/node/pull/24011 Reviewed-By: James M Snell Reviewed-By: Michael Dawson --- src/node_api.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/node_api.cc b/src/node_api.cc index a4408b70f24c10..196f5512c9cd0f 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -1097,6 +1097,7 @@ class ThreadSafeFunction : public node::AsyncResource { } void CloseHandlesAndMaybeDelete(bool set_closing = false) { + v8::HandleScope scope(env->isolate); if (set_closing) { node::Mutex::ScopedLock lock(this->mutex); is_closing = true; @@ -1114,6 +1115,7 @@ class ThreadSafeFunction : public node::AsyncResource { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::async, reinterpret_cast(handle)); + v8::HandleScope scope(ts_fn->env->isolate); uv_close( reinterpret_cast(&ts_fn->idle), [] (uv_handle_t* handle) -> void { From e0e1ac2d0317bd407bf935f1a4ecb338c650f037 Mon Sep 17 00:00:00 2001 From: Gireesh Punathil Date: Thu, 29 Nov 2018 10:26:53 +0530 Subject: [PATCH 09/10] test: mark test_threadsafe_function/test as flaky The test fails consistently on windows-fanned with vs2017. mark it as flaky while the issue is being progressed, and to keep CI green / amber. Ref: https://github.com/nodejs/node/issues/23621 PR-URL: https://github.com/nodejs/node/pull/24714 Reviewed-By: James M Snell Reviewed-By: Rich Trott --- test/addons-napi/addons-napi.status | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 test/addons-napi/addons-napi.status diff --git a/test/addons-napi/addons-napi.status b/test/addons-napi/addons-napi.status new file mode 100644 index 00000000000000..dffcf3787bfe47 --- /dev/null +++ b/test/addons-napi/addons-napi.status @@ -0,0 +1,11 @@ +prefix addons-napi + +# To mark a test as flaky, list the test name in the appropriate section +# below, without ".js", followed by ": PASS,FLAKY". Example: +# sample-test : PASS,FLAKY + +[true] # This section applies to all platforms + +[$system==win32] +# https://github.com/nodejs/node/issues/23621 +test_threadsafe_function/test: PASS,FLAKY From f2e451027df5c18730d22d39dd56e2f99b2f1443 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 26 Dec 2018 13:21:57 +0100 Subject: [PATCH 10/10] test: fix test-repl-envvars MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 180f86507d496b11aa35b2df4594629a92cce329, the test was changed so that the `env` argument of `createInternalRepl()` also contained external environment variables, because keeping them can be necessary for spawning processes on some systems. However, this test does not spawn new processes, and relies on the fact that the environment variables it tests are not already set (and fails otherwise); therefore, reverting to the original state should fix this. Fixes: https://github.com/nodejs/node/issues/21451 Fixes: https://github.com/nodejs/build/issues/1377 Refs: https://github.com/nodejs/node/pull/25219 PR-URL: https://github.com/nodejs/node/pull/25226 Reviewed-By: Rich Trott Reviewed-By: Tobias Nießen Reviewed-By: Benjamin Gruenbaum Reviewed-By: Denys Otrishko Reviewed-By: Colin Ihrig Reviewed-By: Luigi Pinca --- test/parallel/test-repl-envvars.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-repl-envvars.js b/test/parallel/test-repl-envvars.js index d29e7b3574c1f2..c4efd184c4c91c 100644 --- a/test/parallel/test-repl-envvars.js +++ b/test/parallel/test-repl-envvars.js @@ -36,7 +36,7 @@ const tests = [ ]; function run(test) { - const env = Object.assign({}, process.env, test.env); + const env = test.env; const expected = test.expected; const opts = { terminal: true,