Skip to content

Commit

Permalink
worker: use fake MessageEvent for port.onmessage
Browse files Browse the repository at this point in the history
Instead of passing the payload for Workers directly to `.onmessage`,
perform something more similar to what the browser API provides,
namely create an event object with a `.data` property.

This does not make `MessagePort` implement the `EventTarget` API, nor
does it implement the full `MessageEvent` API, but it would make
such extensions non-breaking changes if we desire them at
some point in the future.

(This would be a breaking change if Workers were not experimental.
Currently, this method is also undocumented and only exists with
the idea of enabling some degree of Web compatibility.)

PR-URL: #26082
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Denys Otrishko <[email protected]>
  • Loading branch information
addaleax authored and rvagg committed Feb 28, 2019
1 parent f408d78 commit 77a944c
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 13 deletions.
8 changes: 4 additions & 4 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ MessagePort.prototype.unref = MessagePortPrototype.unref;
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
if (payload.type !== messageTypes.STDIO_WANTS_MORE_DATA)
debug(`[${threadId}] received message`, payload);
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
debug(`[${threadId}] received message`, event);
// Emit the deserialized object to userland.
this.emit('message', payload);
this.emit('message', event.data);
};

// This is for compatibility with the Web's MessagePort API. It makes sense to
Expand Down
3 changes: 3 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(crypto_ec_string, "ec") \
V(crypto_rsa_string, "rsa") \
V(cwd_string, "cwd") \
V(data_string, "data") \
V(dest_string, "dest") \
V(destroyed_string, "destroyed") \
V(detached_string, "detached") \
Expand Down Expand Up @@ -291,6 +292,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(subject_string, "subject") \
V(subjectaltname_string, "subjectaltname") \
V(syscall_string, "syscall") \
V(target_string, "target") \
V(thread_id_string, "threadId") \
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
Expand Down Expand Up @@ -359,6 +361,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(inspector_console_extension_installer, v8::Function) \
V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \
V(message_port, v8::Object) \
V(message_event_object_template, v8::ObjectTemplate) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(native_module_require, v8::Function) \
V(performance_entry_callback, v8::Function) \
Expand Down
27 changes: 22 additions & 5 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Value;
Expand Down Expand Up @@ -589,12 +590,19 @@ void MessagePort::OnMessage() {
// Call the JS .onmessage() callback.
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
Local<Value> args[] = {
received.Deserialize(env(), context).FromMaybe(Local<Value>())
};

if (args[0].IsEmpty() ||
MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
Local<Object> event;
Local<Value> payload;
Local<Value> cb_args[1];
if (!received.Deserialize(env(), context).ToLocal(&payload) ||
!env()->message_event_object_template()->NewInstance(context)
.ToLocal(&event) ||
event->Set(context, env()->data_string(), payload).IsNothing() ||
event->Set(context, env()->target_string(), object()).IsNothing() ||
(cb_args[0] = event, false) ||
MakeCallback(env()->onmessage_string(),
arraysize(cb_args),
cb_args).IsEmpty()) {
// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
Expand Down Expand Up @@ -763,6 +771,8 @@ MaybeLocal<Function> GetMessagePortConstructor(
if (!templ.IsEmpty())
return templ->GetFunction(context);

Isolate* isolate = env->isolate();

{
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
m->SetClassName(env->message_port_constructor_string());
Expand All @@ -775,6 +785,13 @@ MaybeLocal<Function> GetMessagePortConstructor(
env->SetProtoMethod(m, "drain", MessagePort::Drain);

env->set_message_port_constructor_template(m);

Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
e->Set(env->data_string(), Null(isolate));
e->Set(env->target_string(), Null(isolate));
env->set_message_event_object_template(e);
}

return GetMessagePortConstructor(env, context);
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-worker-message-port-transfer-self.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ assert.throws(common.mustCall(() => {

// The failed transfer should not affect the ports in anyway.
port2.onmessage = common.mustCall((message) => {
assert.strictEqual(message, 2);
assert.strictEqual(message.data, 2);

const inspectedPort1 = util.inspect(port1);
const inspectedPort2 = util.inspect(port2);
Expand Down
5 changes: 3 additions & 2 deletions test/parallel/test-worker-message-port.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ const { MessageChannel, MessagePort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.onmessage = common.mustCall((message) => {
assert.strictEqual(message, 4);
assert.strictEqual(message.data, 4);
assert.strictEqual(message.target, port1);
port2.close(common.mustCall());
});

port1.postMessage(2);

port2.onmessage = common.mustCall((message) => {
port2.postMessage(message * 2);
port2.postMessage(message.data * 2);
});
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-worker-onmessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ if (!process.env.HAS_STARTED_WORKER) {
w.postMessage(2);
} else {
parentPort.onmessage = common.mustCall((message) => {
parentPort.postMessage(message * 2);
parentPort.postMessage(message.data * 2);
});
}

0 comments on commit 77a944c

Please sign in to comment.