diff --git a/src/node_platform.cc b/src/node_platform.cc index 947622a219cdef..2dcd7320426dad 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -2,6 +2,7 @@ #include "node_internals.h" #include "env-inl.h" +#include "debug_utils.h" #include "util.h" #include @@ -25,7 +26,127 @@ static void BackgroundRunner(void* data) { } } +class BackgroundTaskRunner::DelayedTaskScheduler { + public: + explicit DelayedTaskScheduler(TaskQueue* tasks) + : pending_worker_tasks_(tasks) {} + + std::unique_ptr Start() { + auto start_thread = [](void* data) { + static_cast(data)->Run(); + }; + std::unique_ptr t { new uv_thread_t() }; + uv_sem_init(&ready_, 0); + CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this)); + uv_sem_wait(&ready_); + uv_sem_destroy(&ready_); + return t; + } + + void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { + tasks_.Push(std::unique_ptr(new ScheduleTask(this, std::move(task), + delay_in_seconds))); + uv_async_send(&flush_tasks_); + } + + void Stop() { + tasks_.Push(std::unique_ptr(new StopTask(this))); + uv_async_send(&flush_tasks_); + } + + private: + void Run() { + TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", + "WorkerThreadsTaskRunner::DelayedTaskScheduler"); + loop_.data = this; + CHECK_EQ(0, uv_loop_init(&loop_)); + flush_tasks_.data = this; + CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks)); + uv_sem_post(&ready_); + + uv_run(&loop_, UV_RUN_DEFAULT); + CheckedUvLoopClose(&loop_); + } + + static void FlushTasks(uv_async_t* flush_tasks) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); + while (std::unique_ptr task = scheduler->tasks_.Pop()) + task->Run(); + } + + class StopTask : public Task { + public: + explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {} + + void Run() override { + std::vector timers; + for (uv_timer_t* timer : scheduler_->timers_) + timers.push_back(timer); + for (uv_timer_t* timer : timers) + scheduler_->TakeTimerTask(timer); + uv_close(reinterpret_cast(&scheduler_->flush_tasks_), + [](uv_handle_t* handle) {}); + } + + private: + DelayedTaskScheduler* scheduler_; + }; + + class ScheduleTask : public Task { + public: + ScheduleTask(DelayedTaskScheduler* scheduler, + std::unique_ptr task, + double delay_in_seconds) + : scheduler_(scheduler), + task_(std::move(task)), + delay_in_seconds_(delay_in_seconds) {} + + void Run() override { + uint64_t delay_millis = + static_cast(delay_in_seconds_ + 0.5) * 1000; + std::unique_ptr timer(new uv_timer_t()); + CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get())); + timer->data = task_.release(); + CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0)); + scheduler_->timers_.insert(timer.release()); + } + + private: + DelayedTaskScheduler* scheduler_; + std::unique_ptr task_; + double delay_in_seconds_; + }; + + static void RunTask(uv_timer_t* timer) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); + scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer)); + } + + std::unique_ptr TakeTimerTask(uv_timer_t* timer) { + std::unique_ptr task(static_cast(timer->data)); + uv_timer_stop(timer); + uv_close(reinterpret_cast(timer), [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + }); + timers_.erase(timer); + return task; + } + + uv_sem_t ready_; + TaskQueue* pending_worker_tasks_; + + TaskQueue tasks_; + uv_loop_t loop_; + uv_async_t flush_tasks_; + std::unordered_set timers_; +}; + BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) { + delayed_task_scheduler_.reset( + new DelayedTaskScheduler(&background_tasks_)); + threads_.push_back(delayed_task_scheduler_->Start()); for (int i = 0; i < thread_pool_size; i++) { std::unique_ptr t { new uv_thread_t() }; if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0) @@ -44,7 +165,7 @@ void BackgroundTaskRunner::PostIdleTask(std::unique_ptr task) { void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { - UNREACHABLE(); + delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds); } void BackgroundTaskRunner::BlockingDrain() { @@ -53,6 +174,7 @@ void BackgroundTaskRunner::BlockingDrain() { void BackgroundTaskRunner::Shutdown() { background_tasks_.Stop(); + delayed_task_scheduler_->Stop(); for (size_t i = 0; i < threads_.size(); i++) { CHECK_EQ(0, uv_thread_join(threads_[i].get())); } diff --git a/src/node_platform.h b/src/node_platform.h index 77826e057771b0..78b2883393b9fa 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -110,6 +110,10 @@ class BackgroundTaskRunner : public v8::TaskRunner { size_t NumberOfAvailableBackgroundThreads() const; private: TaskQueue background_tasks_; + + class DelayedTaskScheduler; + std::unique_ptr delayed_task_scheduler_; + std::vector> threads_; }; diff --git a/test/sequential/test-inspector-runtime-evaluate-with-timeout.js b/test/sequential/test-inspector-runtime-evaluate-with-timeout.js new file mode 100644 index 00000000000000..1def39a82fead4 --- /dev/null +++ b/test/sequential/test-inspector-runtime-evaluate-with-timeout.js @@ -0,0 +1,21 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +common.skipIfInspectorDisabled(); + +(async function test() { + const { strictEqual } = require('assert'); + const { Session } = require('inspector'); + const { promisify } = require('util'); + + const session = new Session(); + session.connect(); + session.post = promisify(session.post); + const result = await session.post('Runtime.evaluate', { + expression: 'for(;;);', + timeout: 0 + }).catch((e) => e); + strictEqual(result.message, 'Execution was terminated'); + session.disconnect(); +})();