Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: use env’s instead of isolate’s RequestInterrupt() + v8::Platform #32523

Closed
wants to merge 7 commits into from
48 changes: 42 additions & 6 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ using v8::NewStringType;
using v8::Number;
using v8::Object;
using v8::Private;
using v8::Script;
using v8::SnapshotCreator;
using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::TracingController;
using v8::TryCatch;
using v8::Undefined;
using v8::Value;
using worker::Worker;
Expand Down Expand Up @@ -394,7 +396,30 @@ Environment::Environment(IsolateData* isolate_data,
}

Environment::~Environment() {
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;
if (Environment** interrupt_data = interrupt_data_.load()) {
// There are pending RequestInterrupt() callbacks. Tell them not to run,
// then force V8 to run interrupts by compiling and running an empty script
// so as not to leak memory.
*interrupt_data = nullptr;

Isolate::AllowJavascriptExecutionScope allow_js_here(isolate());
HandleScope handle_scope(isolate());
TryCatch try_catch(isolate());
Context::Scope context_scope(context());

#ifdef DEBUG
bool consistency_check = false;
isolate()->RequestInterrupt([](Isolate*, void* data) {
*static_cast<bool*>(data) = true;
}, &consistency_check);
#endif

Local<Script> script;
if (Script::Compile(context(), String::Empty(isolate())).ToLocal(&script))
USE(script->Run(context()));

DCHECK(consistency_check);
}

// FreeEnvironment() should have set this.
CHECK(is_stopping());
Expand Down Expand Up @@ -735,12 +760,23 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
}

void Environment::RequestInterruptFromV8() {
if (interrupt_data_ != nullptr) return; // Already scheduled.

// The Isolate may outlive the Environment, so some logic to handle the
// situation in which the Environment is destroyed before the handler runs
// is required.
interrupt_data_ = new Environment*(this);

// We allocate a new pointer to a pointer to this Environment instance, and
// try to set it as interrupt_data_. If interrupt_data_ was already set, then
// callbacks are already scheduled to run and we can delete our own pointer
// and just return. If it was nullptr previously, the Environment** is stored;
// ~Environment sets the Environment* contained in it to nullptr, so that
// the callback can check whether ~Environment has already run and it is thus
// not safe to access the Environment instance itself.
Environment** interrupt_data = new Environment*(this);
Environment** dummy = nullptr;
if (!interrupt_data_.compare_exchange_strong(dummy, interrupt_data)) {
delete interrupt_data;
return; // Already scheduled.
}
Comment on lines +774 to +779
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth adding a comment here explaining what this code does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mmarchini Done, PTAL :)


isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember if this callback runs asynchronously. Any chance we could get a situation where we request the interrupt here but don't run it before the Isolate is deleted? If that's possible, we would have a leak on interrupt_data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right – that is a possibility, but I don’t quite know what to do about it (without changing V8 APIs). The leak was already there before, too, and it’s limited to the size of a pointer…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have an idea. Let me see. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. If ASAN starts to complain, we can force it to ignore this pointer (for now ASAN is happy, so we can leave it as is).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mmarchini 7e75cc1 should address this by flushing interrupts when freeing the Environment instance … feels a bit like a hack to me but I think it’s the best we can do :)

std::unique_ptr<Environment*> env_ptr { static_cast<Environment**>(data) };
Expand All @@ -751,9 +787,9 @@ void Environment::RequestInterruptFromV8() {
// handled during cleanup.
return;
}
env->interrupt_data_ = nullptr;
env->interrupt_data_.store(nullptr);
env->RunAndClearInterrupts();
}, interrupt_data_);
}, interrupt_data);
}

void Environment::ScheduleTimer(int64_t duration_ms) {
Expand Down
7 changes: 4 additions & 3 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,9 @@ class Environment : public MemoryRetainer {

inline void set_main_utf16(std::unique_ptr<v8::String::Value>);

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();

private:
template <typename Fn>
inline void CreateImmediate(Fn&& cb, bool ref);
Expand Down Expand Up @@ -1440,9 +1443,7 @@ class Environment : public MemoryRetainer {
// yet or already have been destroyed.
bool task_queues_async_initialized_ = false;

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();
Environment** interrupt_data_ = nullptr;
std::atomic<Environment**> interrupt_data_ {nullptr};
void RequestInterruptFromV8();
static void CheckImmediate(uv_check_t* handle);

Expand Down
41 changes: 8 additions & 33 deletions src/inspector/main_thread_interface.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "main_thread_interface.h"

#include "env-inl.h"
#include "node_mutex.h"
#include "v8-inspector.h"
#include "util-inl.h"
Expand Down Expand Up @@ -85,19 +86,6 @@ class CallRequest : public Request {
Fn fn_;
};

class DispatchMessagesTask : public v8::Task {
public:
explicit DispatchMessagesTask(std::weak_ptr<MainThreadInterface> thread)
: thread_(thread) {}

void Run() override {
if (auto thread = thread_.lock()) thread->DispatchMessages();
}

private:
std::weak_ptr<MainThreadInterface> thread_;
};

template <typename T>
class AnotherThreadObjectReference {
public:
Expand Down Expand Up @@ -212,36 +200,23 @@ class ThreadSafeDelegate : public InspectorSessionDelegate {
} // namespace


MainThreadInterface::MainThreadInterface(Agent* agent, uv_loop_t* loop,
v8::Isolate* isolate,
v8::Platform* platform)
: agent_(agent), isolate_(isolate),
platform_(platform) {
}
MainThreadInterface::MainThreadInterface(Agent* agent) : agent_(agent) {}

MainThreadInterface::~MainThreadInterface() {
if (handle_)
handle_->Reset();
}

void MainThreadInterface::Post(std::unique_ptr<Request> request) {
CHECK_NOT_NULL(agent_);
Mutex::ScopedLock scoped_lock(requests_lock_);
bool needs_notify = requests_.empty();
requests_.push_back(std::move(request));
if (needs_notify) {
if (isolate_ != nullptr && platform_ != nullptr) {
std::shared_ptr<v8::TaskRunner> taskrunner =
platform_->GetForegroundTaskRunner(isolate_);
std::weak_ptr<MainThreadInterface>* interface_ptr =
new std::weak_ptr<MainThreadInterface>(shared_from_this());
taskrunner->PostTask(
std::make_unique<DispatchMessagesTask>(*interface_ptr));
isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) {
std::unique_ptr<std::weak_ptr<MainThreadInterface>> interface_ptr {
static_cast<std::weak_ptr<MainThreadInterface>*>(opaque) };
if (auto iface = interface_ptr->lock()) iface->DispatchMessages();
}, static_cast<void*>(interface_ptr));
}
std::weak_ptr<MainThreadInterface> weak_self {shared_from_this()};
agent_->env()->RequestInterrupt([weak_self](Environment*) {
if (auto iface = weak_self.lock()) iface->DispatchMessages();
});
}
incoming_message_cond_.Broadcast(scoped_lock);
}
Expand Down Expand Up @@ -274,7 +249,7 @@ void MainThreadInterface::DispatchMessages() {
std::swap(dispatching_message_queue_.front(), task);
dispatching_message_queue_.pop_front();

v8::SealHandleScope seal_handle_scope(isolate_);
v8::SealHandleScope seal_handle_scope(agent_->env()->isolate());
task->Call(this);
}
} while (had_messages);
Expand Down
5 changes: 1 addition & 4 deletions src/inspector/main_thread_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class MainThreadHandle : public std::enable_shared_from_this<MainThreadHandle> {
class MainThreadInterface :
public std::enable_shared_from_this<MainThreadInterface> {
public:
MainThreadInterface(Agent* agent, uv_loop_t*, v8::Isolate* isolate,
v8::Platform* platform);
explicit MainThreadInterface(Agent* agent);
~MainThreadInterface();

void DispatchMessages();
Expand All @@ -98,8 +97,6 @@ class MainThreadInterface :
ConditionVariable incoming_message_cond_;
// Used from any thread
Agent* const agent_;
v8::Isolate* const isolate_;
v8::Platform* const platform_;
std::shared_ptr<MainThreadHandle> handle_;
std::unordered_map<int, std::unique_ptr<Deletable>> managed_objects_;
};
Expand Down
73 changes: 30 additions & 43 deletions src/inspector_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ using v8::Message;
using v8::Object;
using v8::String;
using v8::Task;
using v8::TaskRunner;
using v8::Value;

using v8_inspector::StringBuffer;
Expand All @@ -63,18 +62,6 @@ static std::atomic_bool start_io_thread_async_initialized { false };
// Protects the Agent* stored in start_io_thread_async.data.
static Mutex start_io_thread_async_mutex;

class StartIoTask : public Task {
public:
explicit StartIoTask(Agent* agent) : agent(agent) {}

void Run() override {
agent->StartIoThread();
}

private:
Agent* agent;
};

std::unique_ptr<StringBuffer> ToProtocolString(Isolate* isolate,
Local<Value> value) {
TwoByteValue buffer(isolate, value);
Expand All @@ -86,10 +73,6 @@ void StartIoThreadAsyncCallback(uv_async_t* handle) {
static_cast<Agent*>(handle->data)->StartIoThread();
}

void StartIoInterrupt(Isolate* isolate, void* agent) {
static_cast<Agent*>(agent)->StartIoThread();
}


#ifdef __POSIX__
static void StartIoThreadWakeup(int signo, siginfo_t* info, void* ucontext) {
Expand Down Expand Up @@ -357,32 +340,26 @@ class InspectorTimer {
int64_t interval_ms = 1000 * interval_s;
uv_timer_start(&timer_, OnTimer, interval_ms, interval_ms);
timer_.data = this;

env->AddCleanupHook(CleanupHook, this);
}

InspectorTimer(const InspectorTimer&) = delete;

void Stop() {
env_->RemoveCleanupHook(CleanupHook, this);
if (timer_.data == nullptr) return;

if (timer_.data == this) {
timer_.data = nullptr;
uv_timer_stop(&timer_);
env_->CloseHandle(reinterpret_cast<uv_handle_t*>(&timer_), TimerClosedCb);
}
timer_.data = nullptr;
uv_timer_stop(&timer_);
env_->CloseHandle(reinterpret_cast<uv_handle_t*>(&timer_), TimerClosedCb);
}

inline Environment* env() const { return env_; }

private:
static void OnTimer(uv_timer_t* uvtimer) {
InspectorTimer* timer = node::ContainerOf(&InspectorTimer::timer_, uvtimer);
timer->callback_(timer->data_);
}

static void CleanupHook(void* data) {
static_cast<InspectorTimer*>(data)->Stop();
}

static void TimerClosedCb(uv_handle_t* uvtimer) {
std::unique_ptr<InspectorTimer> timer(
node::ContainerOf(&InspectorTimer::timer_,
Expand All @@ -405,16 +382,29 @@ class InspectorTimerHandle {
InspectorTimerHandle(Environment* env, double interval_s,
V8InspectorClient::TimerCallback callback, void* data) {
timer_ = new InspectorTimer(env, interval_s, callback, data);

env->AddCleanupHook(CleanupHook, this);
}

InspectorTimerHandle(const InspectorTimerHandle&) = delete;

~InspectorTimerHandle() {
CHECK_NOT_NULL(timer_);
timer_->Stop();
timer_ = nullptr;
Stop();
}

private:
void Stop() {
if (timer_ != nullptr) {
timer_->env()->RemoveCleanupHook(CleanupHook, this);
timer_->Stop();
}
timer_ = nullptr;
}

static void CleanupHook(void* data) {
static_cast<InspectorTimerHandle*>(data)->Stop();
}

InspectorTimer* timer_;
};

Expand Down Expand Up @@ -672,8 +662,7 @@ class NodeInspectorClient : public V8InspectorClient {
std::shared_ptr<MainThreadHandle> getThreadHandle() {
if (!interface_) {
interface_ = std::make_shared<MainThreadInterface>(
env_->inspector_agent(), env_->event_loop(), env_->isolate(),
env_->isolate_data()->platform());
env_->inspector_agent());
}
return interface_->GetHandle();
}
Expand Down Expand Up @@ -709,10 +698,9 @@ class NodeInspectorClient : public V8InspectorClient {

running_nested_loop_ = true;

MultiIsolatePlatform* platform = env_->isolate_data()->platform();
while (shouldRunMessageLoop()) {
if (interface_) interface_->WaitForFrontendEvent();
while (platform->FlushForegroundTasks(env_->isolate())) {}
env_->RunAndClearInterrupts();
}
running_nested_loop_ = false;
}
Expand All @@ -737,8 +725,9 @@ class NodeInspectorClient : public V8InspectorClient {
bool is_main_;
bool running_nested_loop_ = false;
std::unique_ptr<V8Inspector> client_;
std::unordered_map<int, std::unique_ptr<ChannelImpl>> channels_;
// Note: ~ChannelImpl may access timers_ so timers_ has to come first.
std::unordered_map<void*, InspectorTimerHandle> timers_;
std::unordered_map<int, std::unique_ptr<ChannelImpl>> channels_;
int next_session_id_ = 1;
bool waiting_for_resume_ = false;
bool waiting_for_frontend_ = false;
Expand Down Expand Up @@ -970,12 +959,10 @@ void Agent::RequestIoThreadStart() {
// for IO events)
CHECK(start_io_thread_async_initialized);
uv_async_send(&start_io_thread_async);
Isolate* isolate = parent_env_->isolate();
v8::Platform* platform = parent_env_->isolate_data()->platform();
std::shared_ptr<TaskRunner> taskrunner =
platform->GetForegroundTaskRunner(isolate);
taskrunner->PostTask(std::make_unique<StartIoTask>(this));
isolate->RequestInterrupt(StartIoInterrupt, this);
parent_env_->RequestInterrupt([this](Environment*) {
StartIoThread();
});

CHECK(start_io_thread_async_initialized);
uv_async_send(&start_io_thread_async);
}
Expand Down
2 changes: 2 additions & 0 deletions src/inspector_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class Agent {
// Interface for interacting with inspectors in worker threads
std::shared_ptr<WorkerManager> GetWorkerManager();

inline Environment* env() const { return parent_env_; }

private:
void ToggleAsyncHook(v8::Isolate* isolate,
const v8::Global<v8::Function>& fn);
Expand Down
2 changes: 1 addition & 1 deletion src/inspector_js_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class JSBindingsConnection : public AsyncWrap {

private:
Environment* env_;
JSBindingsConnection* connection_;
BaseObjectPtr<JSBindingsConnection> connection_;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I’m not sure whether this is necessary anymore, but it was at one point while working on this, and it doesn’t hurt for the code to be a bit more robust here.

};

JSBindingsConnection(Environment* env,
Expand Down
6 changes: 6 additions & 0 deletions test/parallel/test-inspector-connect-main-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ function doConsoleLog(arrayBuffer) {
// and do not interrupt the main code. Interrupting the code flow
// can lead to unexpected behaviors.
async function ensureListenerDoesNotInterrupt(session) {
// Make sure that the following code is not affected by the fact that it may
// run inside an inspector message callback, during which other inspector
// message callbacks (such as the one triggered by doConsoleLog()) would
// not be processed.
await new Promise(setImmediate);

const currentTime = Date.now();
let consoleLogHappened = false;
session.once('Runtime.consoleAPICalled',
Expand Down