diff --git a/src/node.cc b/src/node.cc index 784dd157f14314..266c276d3a35d8 100644 --- a/src/node.cc +++ b/src/node.cc @@ -434,8 +434,7 @@ static struct { tracing_file_writer_ = tracing_agent_->AddClient( ParseCommaSeparatedSet(trace_enabled_categories), std::unique_ptr( - new tracing::NodeTraceWriter(trace_file_pattern, - tracing_agent_->loop())), + new tracing::NodeTraceWriter(trace_file_pattern)), tracing::Agent::kUseDefaultCategories); } } diff --git a/src/tracing/agent.cc b/src/tracing/agent.cc index ad842db63630dc..9b435b56d2c8ad 100644 --- a/src/tracing/agent.cc +++ b/src/tracing/agent.cc @@ -53,9 +53,27 @@ Agent::Agent() { tracing_controller_->Initialize(nullptr); CHECK_EQ(uv_loop_init(&tracing_loop_), 0); + CHECK_EQ(uv_async_init(&tracing_loop_, + &initialize_writer_async_, + [](uv_async_t* async) { + Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async); + agent->InitializeWritersOnThread(); + }), 0); +} + +void Agent::InitializeWritersOnThread() { + Mutex::ScopedLock lock(initialize_writer_mutex_); + while (!to_be_initialized_.empty()) { + AsyncTraceWriter* head = *to_be_initialized_.begin(); + head->InitializeOnThread(&tracing_loop_); + to_be_initialized_.erase(head); + } + initialize_writer_condvar_.Broadcast(lock); } Agent::~Agent() { + uv_close(reinterpret_cast(&initialize_writer_async_), nullptr); + uv_run(&tracing_loop_, UV_RUN_ONCE); CheckedUvLoopClose(&tracing_loop_); } @@ -95,9 +113,18 @@ AgentWriterHandle Agent::AddClient( ScopedSuspendTracing suspend(tracing_controller_, this); int id = next_writer_id_++; + AsyncTraceWriter* raw = writer.get(); writers_[id] = std::move(writer); categories_[id] = { use_categories->begin(), use_categories->end() }; + { + Mutex::ScopedLock lock(initialize_writer_mutex_); + to_be_initialized_.insert(raw); + uv_async_send(&initialize_writer_async_); + while (to_be_initialized_.count(raw) > 0) + initialize_writer_condvar_.Wait(lock); + } + return AgentWriterHandle(this, id); } @@ -120,6 +147,10 @@ void Agent::StopTracing() { void Agent::Disconnect(int client) { if (client == kDefaultHandleId) return; + { + Mutex::ScopedLock lock(initialize_writer_mutex_); + to_be_initialized_.erase(writers_[client].get()); + } ScopedSuspendTracing suspend(tracing_controller_, this); writers_.erase(client); categories_.erase(client); diff --git a/src/tracing/agent.h b/src/tracing/agent.h index e8bf727c505ce3..045aaef85e8ea6 100644 --- a/src/tracing/agent.h +++ b/src/tracing/agent.h @@ -5,6 +5,7 @@ #include "uv.h" #include "v8.h" #include "util.h" +#include "node_mutex.h" #include #include @@ -23,6 +24,7 @@ class AsyncTraceWriter { virtual ~AsyncTraceWriter() {} virtual void AppendTraceEvent(TraceObject* trace_event) = 0; virtual void Flush(bool blocking) = 0; + virtual void InitializeOnThread(uv_loop_t* loop) {} }; class TracingController : public v8::platform::tracing::TracingController { @@ -92,13 +94,11 @@ class Agent { TraceConfig* CreateTraceConfig() const; - // TODO(addaleax): This design is broken and inherently thread-unsafe. - inline uv_loop_t* loop() { return &tracing_loop_; } - private: friend class AgentWriterHandle; static void ThreadCb(void* arg); + void InitializeWritersOnThread(); void Start(); void StopTracing(); @@ -120,6 +120,13 @@ class Agent { std::unordered_map> categories_; std::unordered_map> writers_; TracingController* tracing_controller_ = nullptr; + + // Variables related to initializing per-event-loop properties of individual + // writers, such as libuv handles. + Mutex initialize_writer_mutex_; + ConditionVariable initialize_writer_condvar_; + uv_async_t initialize_writer_async_; + std::set to_be_initialized_; }; void AgentWriterHandle::reset() { diff --git a/src/tracing/node_trace_writer.cc b/src/tracing/node_trace_writer.cc index 5e3ddc633f153f..2fdb92972330b5 100644 --- a/src/tracing/node_trace_writer.cc +++ b/src/tracing/node_trace_writer.cc @@ -3,16 +3,25 @@ #include #include -#include "util.h" +#include "util-inl.h" namespace node { namespace tracing { -NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern, - uv_loop_t* tracing_loop) - : tracing_loop_(tracing_loop), log_file_pattern_(log_file_pattern) { +NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern) + : log_file_pattern_(log_file_pattern) {} + +void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) { + CHECK_NULL(tracing_loop_); + tracing_loop_ = loop; + flush_signal_.data = this; - int err = uv_async_init(tracing_loop_, &flush_signal_, FlushSignalCb); + int err = uv_async_init(tracing_loop_, &flush_signal_, + [](uv_async_t* signal) { + NodeTraceWriter* trace_writer = + ContainerOf(&NodeTraceWriter::flush_signal_, signal); + trace_writer->FlushPrivate(); + }); CHECK_EQ(err, 0); exit_signal_.data = this; @@ -126,11 +135,6 @@ void NodeTraceWriter::FlushPrivate() { WriteToFile(std::move(str), highest_request_id); } -void NodeTraceWriter::FlushSignalCb(uv_async_t* signal) { - NodeTraceWriter* trace_writer = static_cast(signal->data); - trace_writer->FlushPrivate(); -} - void NodeTraceWriter::Flush(bool blocking) { Mutex::ScopedLock scoped_lock(request_mutex_); if (!json_trace_writer_) { @@ -170,7 +174,7 @@ void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) { } void NodeTraceWriter::WriteCb(uv_fs_t* req) { - WriteRequest* write_req = reinterpret_cast(req); + WriteRequest* write_req = ContainerOf(&WriteRequest::req, req); CHECK_GE(write_req->req.result, 0); NodeTraceWriter* writer = write_req->writer; @@ -187,13 +191,15 @@ void NodeTraceWriter::WriteCb(uv_fs_t* req) { // static void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) { - NodeTraceWriter* trace_writer = static_cast(signal->data); + NodeTraceWriter* trace_writer = + ContainerOf(&NodeTraceWriter::exit_signal_, signal); uv_close(reinterpret_cast(&trace_writer->flush_signal_), nullptr); uv_close(reinterpret_cast(&trace_writer->exit_signal_), [](uv_handle_t* signal) { NodeTraceWriter* trace_writer = - static_cast(signal->data); + ContainerOf(&NodeTraceWriter::exit_signal_, + reinterpret_cast(signal)); Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_); trace_writer->exited_ = true; trace_writer->exit_cond_.Signal(scoped_lock); diff --git a/src/tracing/node_trace_writer.h b/src/tracing/node_trace_writer.h index b2d5e7912fa01d..9c4decc8eec3d4 100644 --- a/src/tracing/node_trace_writer.h +++ b/src/tracing/node_trace_writer.h @@ -4,7 +4,6 @@ #include #include -#include "node_mutex.h" #include "libplatform/v8-tracing.h" #include "tracing/agent.h" #include "uv.h" @@ -17,10 +16,10 @@ using v8::platform::tracing::TraceWriter; class NodeTraceWriter : public AsyncTraceWriter { public: - explicit NodeTraceWriter(const std::string& log_file_pattern, - uv_loop_t* tracing_loop); + explicit NodeTraceWriter(const std::string& log_file_pattern); ~NodeTraceWriter(); + void InitializeOnThread(uv_loop_t* loop) override; void AppendTraceEvent(TraceObject* trace_event) override; void Flush(bool blocking) override; @@ -38,11 +37,10 @@ class NodeTraceWriter : public AsyncTraceWriter { void OpenNewFileForStreaming(); void WriteToFile(std::string&& str, int highest_request_id); void WriteSuffix(); - static void FlushSignalCb(uv_async_t* signal); void FlushPrivate(); static void ExitSignalCb(uv_async_t* signal); - uv_loop_t* tracing_loop_; + uv_loop_t* tracing_loop_ = nullptr; // Triggers callback to initiate writing the contents of stream_ to disk. uv_async_t flush_signal_; // Triggers callback to close async objects, ending the tracing thread.