Skip to content

Commit

Permalink
inspector: tie objects lifetime to the thread they belong to
Browse files Browse the repository at this point in the history
PR-URL: #22242
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Aleksei Koziatinskii <[email protected]>
  • Loading branch information
eugeneo committed Aug 14, 2018
1 parent 8018146 commit 7404a4d
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 67 deletions.
185 changes: 123 additions & 62 deletions src/inspector/main_thread_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "node_mutex.h"
#include "v8-inspector.h"

#include <functional>
#include <unicode/unistr.h>

namespace node {
Expand All @@ -13,56 +14,72 @@ using v8_inspector::StringView;
using v8_inspector::StringBuffer;

template <typename T>
class DeleteRequest : public Request {
class DeletableWrapper : public Deletable {
public:
explicit DeleteRequest(T* object) : object_(object) {}
void Call() override {
delete object_;
explicit DeletableWrapper(std::unique_ptr<T> object)
: object_(std::move(object)) {}
~DeletableWrapper() override = default;

static T* get(MainThreadInterface* thread, int id) {
return
static_cast<DeletableWrapper<T>*>(thread->GetObject(id))->object_.get();
}

private:
T* object_;
std::unique_ptr<T> object_;
};

template <typename Target, typename Arg>
class SingleArgumentFunctionCall : public Request {
public:
using Fn = void (Target::*)(Arg);
template <typename T>
std::unique_ptr<Deletable> WrapInDeletable(std::unique_ptr<T> object) {
return std::unique_ptr<DeletableWrapper<T>>(
new DeletableWrapper<T>(std::move(object)));
}

SingleArgumentFunctionCall(Target* target, Fn fn, Arg argument)
: target_(target),
fn_(fn),
arg_(std::move(argument)) {}
template <typename Factory>
class CreateObjectRequest : public Request {
public:
CreateObjectRequest(int object_id, Factory factory)
: object_id_(object_id), factory_(std::move(factory)) {}

void Call() override {
Apply(target_, fn_, std::move(arg_));
void Call(MainThreadInterface* thread) {
thread->AddObject(object_id_, WrapInDeletable(factory_(thread)));
}

private:
template <typename Element>
void Apply(Element* target, Fn fn, Arg arg) {
(target->*fn)(std::move(arg));
int object_id_;
Factory factory_;
};

template <typename Factory>
std::unique_ptr<Request> NewCreateRequest(int object_id, Factory factory) {
return std::unique_ptr<Request>(
new CreateObjectRequest<Factory>(object_id, std::move(factory)));
}

class DeleteRequest : public Request {
public:
explicit DeleteRequest(int object_id) : object_id_(object_id) {}

void Call(MainThreadInterface* thread) override {
thread->RemoveObject(object_id_);
}

Target* target_;
Fn fn_;
Arg arg_;
private:
int object_id_;
};

class PostMessageRequest : public Request {
template <typename Target, typename Fn>
class CallRequest : public Request {
public:
PostMessageRequest(InspectorSessionDelegate* delegate,
StringView message)
: delegate_(delegate),
message_(StringBuffer::create(message)) {}
CallRequest(int id, Fn fn) : id_(id), fn_(std::move(fn)) {}

void Call() override {
delegate_->SendMessageToFrontend(message_->string());
void Call(MainThreadInterface* thread) override {
fn_(DeletableWrapper<Target>::get(thread, id_));
}

private:
InspectorSessionDelegate* delegate_;
std::unique_ptr<StringBuffer> message_;
int id_;
Fn fn_;
};

class DispatchMessagesTask : public v8::Task {
Expand All @@ -88,45 +105,63 @@ void DisposePairCallback(uv_handle_t* ref) {
template <typename T>
class AnotherThreadObjectReference {
public:
// We create it on whatever thread, just make sure it gets disposed on the
// proper thread.
AnotherThreadObjectReference(std::shared_ptr<MainThreadHandle> thread,
T* object)
: thread_(thread), object_(object) {
AnotherThreadObjectReference(
std::shared_ptr<MainThreadHandle> thread, int object_id)
: thread_(thread), object_id_(object_id) {}

template <typename Factory>
AnotherThreadObjectReference(
std::shared_ptr<MainThreadHandle> thread, Factory factory)
: AnotherThreadObjectReference(thread, thread->newObjectId()) {
thread_->Post(NewCreateRequest(object_id_, std::move(factory)));
}
AnotherThreadObjectReference(AnotherThreadObjectReference&) = delete;

~AnotherThreadObjectReference() {
// Disappearing thread may cause a memory leak
CHECK(thread_->Post(
std::unique_ptr<DeleteRequest<T>>(new DeleteRequest<T>(object_))));
object_ = nullptr;
thread_->Post(
std::unique_ptr<DeleteRequest>(new DeleteRequest(object_id_)));
}

template <typename Fn, typename Arg>
void Post(Fn fn, Arg argument) const {
using R = SingleArgumentFunctionCall<T, Arg>;
thread_->Post(std::unique_ptr<R>(new R(object_, fn, std::move(argument))));
template <typename Fn>
void Call(Fn fn) const {
using Request = CallRequest<T, Fn>;
thread_->Post(std::unique_ptr<Request>(
new Request(object_id_, std::move(fn))));
}

T* get() const {
return object_;
template <typename Arg>
void Call(void (T::*fn)(Arg), Arg argument) const {
Call(std::bind(Apply<Arg>, std::placeholders::_1, fn, std::move(argument)));
}

private:
// This has to use non-const reference to support std::bind with non-copyable
// types
template <typename Argument>
static void Apply(T* target, void (T::*fn)(Argument),
/* NOLINT (runtime/references) */ Argument& argument) {
(target->*fn)(std::move(argument));
}

std::shared_ptr<MainThreadHandle> thread_;
T* object_;
const int object_id_;
};

class MainThreadSessionState {
public:
MainThreadSessionState(
std::shared_ptr<MainThreadHandle> thread,
bool prevent_shutdown) : thread_(thread),
prevent_shutdown_(prevent_shutdown) {}
MainThreadSessionState(MainThreadInterface* thread, bool prevent_shutdown)
: thread_(thread),
prevent_shutdown_(prevent_shutdown) {}

static std::unique_ptr<MainThreadSessionState> Create(
MainThreadInterface* thread, bool prevent_shutdown) {
return std::unique_ptr<MainThreadSessionState>(
new MainThreadSessionState(thread, prevent_shutdown));
}

void Connect(std::unique_ptr<InspectorSessionDelegate> delegate) {
Agent* agent = thread_->GetInspectorAgent();
Agent* agent = thread_->inspector_agent();
if (agent != nullptr)
session_ = agent->Connect(std::move(delegate), prevent_shutdown_);
}
Expand All @@ -136,7 +171,7 @@ class MainThreadSessionState {
}

private:
std::shared_ptr<MainThreadHandle> thread_;
MainThreadInterface* thread_;
bool prevent_shutdown_;
std::unique_ptr<InspectorSession> session_;
};
Expand All @@ -148,12 +183,14 @@ class CrossThreadInspectorSession : public InspectorSession {
std::shared_ptr<MainThreadHandle> thread,
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown)
: state_(thread, new MainThreadSessionState(thread, prevent_shutdown)) {
state_.Post(&MainThreadSessionState::Connect, std::move(delegate));
: state_(thread, std::bind(MainThreadSessionState::Create,
std::placeholders::_1,
prevent_shutdown)) {
state_.Call(&MainThreadSessionState::Connect, std::move(delegate));
}

void Dispatch(const StringView& message) override {
state_.Post(&MainThreadSessionState::Dispatch,
state_.Call(&MainThreadSessionState::Dispatch,
StringBuffer::create(message));
}

Expand All @@ -163,13 +200,15 @@ class CrossThreadInspectorSession : public InspectorSession {

class ThreadSafeDelegate : public InspectorSessionDelegate {
public:
ThreadSafeDelegate(std::shared_ptr<MainThreadHandle> thread,
std::unique_ptr<InspectorSessionDelegate> delegate)
: thread_(thread), delegate_(thread, delegate.release()) {}
ThreadSafeDelegate(std::shared_ptr<MainThreadHandle> thread, int object_id)
: thread_(thread), delegate_(thread, object_id) {}

void SendMessageToFrontend(const v8_inspector::StringView& message) override {
thread_->Post(std::unique_ptr<Request>(
new PostMessageRequest(delegate_.get(), message)));
delegate_.Call(
[m = StringBuffer::create(message)]
(InspectorSessionDelegate* delegate) {
delegate->SendMessageToFrontend(m->string());
});
}

private:
Expand Down Expand Up @@ -252,7 +291,7 @@ void MainThreadInterface::DispatchMessages() {
MessageQueue::value_type task;
std::swap(dispatching_message_queue_.front(), task);
dispatching_message_queue_.pop_front();
task->Call();
task->Call(this);
}
} while (had_messages);
dispatching_messages_ = false;
Expand All @@ -264,6 +303,26 @@ std::shared_ptr<MainThreadHandle> MainThreadInterface::GetHandle() {
return handle_;
}

void MainThreadInterface::AddObject(int id,
std::unique_ptr<Deletable> object) {
CHECK_NE(nullptr, object);
managed_objects_[id] = std::move(object);
}

void MainThreadInterface::RemoveObject(int id) {
CHECK_EQ(1, managed_objects_.erase(id));
}

Deletable* MainThreadInterface::GetObject(int id) {
auto iterator = managed_objects_.find(id);
// This would mean the object is requested after it was disposed, which is
// a coding error.
CHECK_NE(managed_objects_.end(), iterator);
Deletable* pointer = iterator->second.get();
CHECK_NE(nullptr, pointer);
return pointer;
}

std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) {
icu::UnicodeString utf16 = icu::UnicodeString::fromUTF8(
icu::StringPiece(message.data(), message.length()));
Expand Down Expand Up @@ -303,10 +362,12 @@ Agent* MainThreadHandle::GetInspectorAgent() {
}

std::unique_ptr<InspectorSessionDelegate>
MainThreadHandle::MakeThreadSafeDelegate(
MainThreadHandle::MakeDelegateThreadSafe(
std::unique_ptr<InspectorSessionDelegate> delegate) {
int id = newObjectId();
main_thread_->AddObject(id, WrapInDeletable(std::move(delegate)));
return std::unique_ptr<InspectorSessionDelegate>(
new ThreadSafeDelegate(shared_from_this(), std::move(delegate)));
new ThreadSafeDelegate(shared_from_this(), id));
}

bool MainThreadHandle::Expired() {
Expand Down
25 changes: 20 additions & 5 deletions src/inspector/main_thread_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include "inspector_agent.h"
#include "node_mutex.h"

#include <atomic>
#include <deque>
#include <memory>
#include <unordered_map>
#include <unordered_set>

namespace v8_inspector {
class StringBuffer;
Expand All @@ -21,31 +21,41 @@ class StringView;

namespace node {
namespace inspector {
class MainThreadInterface;

class Request {
public:
virtual void Call() = 0;
virtual void Call(MainThreadInterface*) = 0;
virtual ~Request() {}
};

class Deletable {
public:
virtual ~Deletable() {}
};

std::unique_ptr<v8_inspector::StringBuffer> Utf8ToStringView(
const std::string& message);

using MessageQueue = std::deque<std::unique_ptr<Request>>;
class MainThreadInterface;

class MainThreadHandle : public std::enable_shared_from_this<MainThreadHandle> {
public:
explicit MainThreadHandle(MainThreadInterface* main_thread)
: main_thread_(main_thread) {}
: main_thread_(main_thread) {
}
~MainThreadHandle() {
CHECK_NULL(main_thread_); // main_thread_ should have called Reset
}
std::unique_ptr<InspectorSession> Connect(
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown);
int newObjectId() {
return ++next_object_id_;
}
bool Post(std::unique_ptr<Request> request);
Agent* GetInspectorAgent();
std::unique_ptr<InspectorSessionDelegate> MakeThreadSafeDelegate(
std::unique_ptr<InspectorSessionDelegate> MakeDelegateThreadSafe(
std::unique_ptr<InspectorSessionDelegate> delegate);
bool Expired();

Expand All @@ -55,6 +65,7 @@ class MainThreadHandle : public std::enable_shared_from_this<MainThreadHandle> {
MainThreadInterface* main_thread_;
Mutex block_lock_;
int next_session_id_ = 0;
std::atomic_int next_object_id_ = {1};

friend class MainThreadInterface;
};
Expand All @@ -72,6 +83,9 @@ class MainThreadInterface {
Agent* inspector_agent() {
return agent_;
}
void AddObject(int handle, std::unique_ptr<Deletable> object);
Deletable* GetObject(int id);
void RemoveObject(int handle);

private:
using AsyncAndInterface = std::pair<uv_async_t, MainThreadInterface*>;
Expand All @@ -92,6 +106,7 @@ class MainThreadInterface {
v8::Platform* const platform_;
DeleteFnPtr<AsyncAndInterface, CloseAsync> main_thread_request_;
std::shared_ptr<MainThreadHandle> handle_;
std::unordered_map<int, std::unique_ptr<Deletable>> managed_objects_;
};

} // namespace inspector
Expand Down

0 comments on commit 7404a4d

Please sign in to comment.