Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: use special message as MessagePort close command #27705

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}

bool Message::IsCloseMessage() const {
return main_message_buf_.data == nullptr;
}

namespace {

// This is used to tell V8 how to read transferred host objects, like other
Expand Down Expand Up @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {

MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
CHECK(!IsCloseMessage());

EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

Expand Down Expand Up @@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,

// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
Expand Down Expand Up @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}

bool MessagePortData::IsSiblingClosed() const {
Mutex::ScopedLock lock(*sibling_mutex_);
return sibling_ == nullptr;
}

void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
Expand All @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}

void MessagePortData::PingOwnerAfterDisentanglement() {
Mutex::ScopedLock lock(mutex_);
if (owner_ != nullptr)
owner_->TriggerAsync();
}

void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
Expand All @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}

// We close MessagePorts after disentanglement, so we trigger the
// corresponding uv_async_t to let them know that this happened.
PingOwnerAfterDisentanglement();
// We close MessagePorts after disentanglement, so we enqueue a corresponding
// message and trigger the corresponding uv_async_t to let them know that
// this happened.
AddToIncomingQueue(Message());
if (sibling != nullptr) {
sibling->PingOwnerAfterDisentanglement();
sibling->AddToIncomingQueue(Message());
}
}

Expand Down Expand Up @@ -588,16 +585,27 @@ void MessagePort::OnMessage() {
Mutex::ScopedLock lock(data_->mutex_);

Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));

if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
static_cast<int>(receiving_messages_));

// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!receiving_messages_ &&
!data_->incoming_messages_.front().IsCloseMessage())) {
break;
}

received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}

if (received.IsCloseMessage()) {
Close();
return;
}

if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
Expand Down Expand Up @@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}

if (data_ && data_->IsSiblingClosed()) {
Close();
}
}

bool MessagePort::IsSiblingClosed() const {
CHECK(data_);
return data_->IsSiblingClosed();
}

void MessagePort::OnClose() {
Expand Down Expand Up @@ -722,17 +721,16 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
}

void MessagePort::Start() {
Mutex::ScopedLock lock(data_->mutex_);
Debug(this, "Start receiving messages");
data_->receiving_messages_ = true;
receiving_messages_ = true;
Mutex::ScopedLock lock(data_->mutex_);
if (!data_->incoming_messages_.empty())
TriggerAsync();
}

void MessagePort::Stop() {
Mutex::ScopedLock lock(data_->mutex_);
Debug(this, "Stop receiving messages");
data_->receiving_messages_ = false;
receiving_messages_ = false;
}

void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Expand Down
18 changes: 8 additions & 10 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ class MessagePort;
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
// Create a Message with a specific underlying payload, in the format of the
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());

Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;

// Whether this is a message indicating that the port is to be closed.
// This is the last message to be received by a MessagePort.
bool IsCloseMessage() const;

// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
Expand Down Expand Up @@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer {
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);

// Returns true if and only this MessagePort is currently not entangled
// with another message port.
bool IsSiblingClosed() const;

// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
Expand All @@ -109,14 +112,9 @@ class MessagePortData : public MemoryRetainer {
SET_SELF_SIZE(MessagePortData)

private:
// After disentangling this message port, the owner handle (if any)
// is asynchronously triggered, so that it can close down naturally.
void PingOwnerAfterDisentanglement();

// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
bool receiving_messages_ = false;
std::list<Message> incoming_messages_;
MessagePort* owner_ = nullptr;
// This mutex protects the sibling_ field and is shared between two entangled
Expand Down Expand Up @@ -179,7 +177,6 @@ class MessagePort : public HandleWrap {
// messages.
std::unique_ptr<MessagePortData> Detach();

bool IsSiblingClosed() const;
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;

Expand All @@ -205,6 +202,7 @@ class MessagePort : public HandleWrap {
void TriggerAsync();

std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
uv_async_t async_;

friend class MessagePortData;
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ test-tls-enable-trace-cli: PASS,FLAKY

[$system==win32]
test-http2-pipe: PASS,FLAKY
test-worker-syntax-error: PASS,FLAKY
test-worker-syntax-error-file: PASS,FLAKY
# https://github.com/nodejs/node/issues/23277
test-worker-memory: PASS,FLAKY
# https://github.com/nodejs/node/issues/20750
Expand Down
38 changes: 38 additions & 0 deletions test/parallel/test-worker-message-port-message-before-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { once } = require('events');
const { Worker, MessageChannel } = require('worker_threads');

// This is a regression test for the race condition underlying
// https://github.com/nodejs/node/issues/22762.
// It ensures that all messages send before a MessagePort#close() call are
// received. Previously, what could happen was a race condition like this:
// - Thread 1 sends message A
// - Thread 2 begins receiving/emitting message A
// - Thread 1 sends message B
// - Thread 1 closes its side of the channel
// - Thread 2 finishes receiving/emitting message A
// - Thread 2 sees that the port should be closed
// - Thread 2 closes the port, discarding message B in the process.

async function test() {
const worker = new Worker(`
require('worker_threads').parentPort.on('message', ({ port }) => {
port.postMessage('firstMessage');
port.postMessage('lastMessage');
port.close();
});
`, { eval: true });

for (let i = 0; i < 10000; i++) {
const { port1, port2 } = new MessageChannel();
worker.postMessage({ port: port2 }, [ port2 ]);
await once(port1, 'message'); // 'complexObject'
assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']);
}

worker.terminate();
}

test().then(common.mustCall());