Skip to content

Commit

Permalink
feat(extensions): BroadcastChannel WPT conformance
Browse files Browse the repository at this point in the history
Replaces the file-backed provider by an in-memory one because proper
file locking is a hard problem that detracts from the proof of concept.

Teach the WPT runner how to extract tests from .html files because all
the relevant tests in test_util/wpt/webmessaging/broadcastchannel are
inside basics.html and interface.html.
  • Loading branch information
bnoordhuis committed May 16, 2021
1 parent a95b030 commit 7735044
Show file tree
Hide file tree
Showing 15 changed files with 300 additions and 110 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn create_web_worker_callback(
no_color: !colors::use_color(),
get_error_class_fn: Some(&crate::errors::get_error_class_name),
blob_url_store: program_state.blob_url_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
};

let mut worker = WebWorker::from_options(
Expand Down Expand Up @@ -212,6 +213,7 @@ pub fn create_main_worker(
.join(checksum::gen(&[loc.to_string().as_bytes()]))
}),
blob_url_store: program_state.blob_url_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
};

let mut worker = MainWorker::from_options(main_module, permissions, &options);
Expand Down
4 changes: 4 additions & 0 deletions cli/program_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::module_graph::TypeLib;
use crate::source_maps::SourceMapGetter;
use crate::specifier_handler::FetchHandler;
use crate::version;
use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_file::BlobUrlStore;
use deno_runtime::inspector::InspectorServer;
use deno_runtime::permissions::Permissions;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct ProgramState {
pub maybe_inspector_server: Option<Arc<InspectorServer>>,
pub ca_data: Option<Vec<u8>>,
pub blob_url_store: BlobUrlStore,
pub broadcast_channel: InMemoryBroadcastChannel,
}

impl ProgramState {
Expand All @@ -77,6 +79,7 @@ impl ProgramState {
};

let blob_url_store = BlobUrlStore::default();
let broadcast_channel = InMemoryBroadcastChannel::default();

let file_fetcher = FileFetcher::new(
http_cache,
Expand Down Expand Up @@ -143,6 +146,7 @@ impl ProgramState {
maybe_inspector_server,
ca_data,
blob_url_store,
broadcast_channel,
};
Ok(Arc::new(program_state))
}
Expand Down
3 changes: 3 additions & 0 deletions cli/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use deno_core::v8_set_flags;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_file::BlobUrlStore;
use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsOptions;
Expand Down Expand Up @@ -160,6 +161,7 @@ pub async fn run(
let main_module = resolve_url(SPECIFIER)?;
let permissions = Permissions::from_options(&metadata.permissions);
let blob_url_store = BlobUrlStore::default();
let broadcast_channel = InMemoryBroadcastChannel::default();
let module_loader = Rc::new(EmbeddedModuleLoader(source_code));
let create_web_worker_cb = Arc::new(|_| {
todo!("Worker are currently not supported in standalone binaries");
Expand Down Expand Up @@ -193,6 +195,7 @@ pub async fn run(
location: metadata.location,
location_data_dir: None,
blob_url_store,
broadcast_channel,
};
let mut worker =
MainWorker::from_options(main_module.clone(), permissions, &options);
Expand Down
99 changes: 73 additions & 26 deletions extensions/broadcast_channel/01_broadcast_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { setTarget } = window.__bootstrap.event;

const handlerSymbol = Symbol("eventHandlers");
function makeWrappedHandler(handler) {
Expand All @@ -21,7 +22,10 @@
// HTML specification section 8.1.5.1
Object.defineProperty(emitter, `on${name}`, {
get() {
return this[handlerSymbol]?.get(name)?.handler;
// TODO(bnoordhuis) The "BroadcastChannel should have an onmessage
// event" WPT test expects that .onmessage !== undefined. Returning
// null makes it pass but is perhaps not exactly in the spirit.
return this[handlerSymbol]?.get(name)?.handler ?? null;
},
set(value) {
if (!this[handlerSymbol]) {
Expand All @@ -43,12 +47,52 @@

const _name = Symbol("[[name]]");
const _closed = Symbol("[[closed]]");
const _rid = Symbol("[[rid]]");

const channels = [];
let rid = -1;

async function recv() {
while (channels.length > 0) {
const message = await core.opAsync("op_broadcast_recv", rid);

if (message === null) {
break;
}

const { name, data } = message;
dispatch(null, name, data);
}

core.close(rid);
rid = -1;
}

function dispatch(source, name, data) {
data = new Uint8Array(data);

const location = window.location;
const origin = location.protocol + "//" + location.host;

for (const channel of channels) {
if (channel === source) continue; // Don't self-send.
if (channel[_name] !== name) continue;
if (channel[_closed]) continue;

queueMicrotask(() => {
if (channel[_closed]) return;
const event = new MessageEvent("message", {
data: core.deserialize(data), // TODO(bnoordhuis) Cache immutables.
origin,
});
setTarget(event, channel);
channel.dispatchEvent(event);
});
}
}

class BroadcastChannel extends EventTarget {
[_name];
[_closed] = false;
[_rid];

get name() {
return this[_name];
Expand All @@ -67,11 +111,16 @@
context: "Argument 1",
});

this[_rid] = core.opSync("op_broadcast_open", this[_name]);

this[webidl.brand] = webidl.brand;

this.#eventLoop();
channels.push(this);

if (rid === -1) {
// Create the rid immediately, otherwise there is a time window (and a
// race condition) where messages can get lost, because recv() is async.
rid = core.opSync("op_broadcast_subscribe");
recv();
}
}

postMessage(message) {
Expand All @@ -81,32 +130,30 @@
throw new DOMException("Already closed", "InvalidStateError");
}

core.opAsync("op_broadcast_send", this[_rid], core.serialize(message));
const prefix = "Failed to execute 'postMessage' on 'BroadcastChannel'";
webidl.requiredArguments(arguments.length, 1, { prefix });

if (typeof message === "function" || typeof message === "symbol") {
throw new DOMException("Uncloneable value", "DataCloneError");
}

const data = core.serialize(message);

// Send to other listeners in this VM.
dispatch(this, this[_name], data);

// Send to listeners in other VMs.
core.opAsync("op_broadcast_send", [rid, this[_name]], data);
}

close() {
webidl.assertBranded(this, BroadcastChannel);

this[_closed] = true;
core.close(this[_rid]);
}
const index = channels.indexOf(this);
if (index !== -1) channels.splice(index, 1);
if (channels.length === 0) core.opSync("op_broadcast_unsubscribe", rid);

async #eventLoop() {
while (!this[_closed]) {
const message = await core.opAsync(
"op_broadcast_next_event",
this[_rid],
);

if (message.length !== 0) {
const event = new MessageEvent("message", {
data: core.deserialize(message),
origin: window.location,
});
event.target = this;
this.dispatchEvent(event);
}
}
this[_closed] = true;
}
}

Expand Down
2 changes: 2 additions & 0 deletions extensions/broadcast_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"

[dependencies]
async-trait = "0.1"
deno_core = { version = "0.87.0", path = "../../core" }
tokio = { version = "1.4.0", features = ["full"] }
uuid = { version = "0.8.2", features = ["v4"] }
97 changes: 97 additions & 0 deletions extensions/broadcast_channel/in_memory_broadcast_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

use crate::BroadcastChannel;
use async_trait::async_trait;
use deno_core::error::AnyError;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use uuid::Uuid;

#[derive(Clone)]
pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>);

pub struct InMemoryBroadcastChannelResource {
rx: tokio::sync::Mutex<(
broadcast::Receiver<Message>,
mpsc::UnboundedReceiver<()>,
)>,
cancel_tx: mpsc::UnboundedSender<()>,
uuid: Uuid,
}

#[derive(Clone, Debug)]
struct Message {
name: Arc<String>,
data: Arc<Vec<u8>>,
uuid: Uuid,
}

impl Default for InMemoryBroadcastChannel {
fn default() -> Self {
let (tx, _) = broadcast::channel(256);
Self(Arc::new(Mutex::new(tx)))
}
}

#[async_trait]
impl BroadcastChannel for InMemoryBroadcastChannel {
type Resource = InMemoryBroadcastChannelResource;

fn subscribe(&self) -> Result<Self::Resource, AnyError> {
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel();
let broadcast_rx = self.0.lock().unwrap().subscribe();
let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx));
let uuid = Uuid::new_v4();
Ok(Self::Resource {
rx,
cancel_tx,
uuid,
})
}

fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> {
Ok(resource.cancel_tx.send(())?)
}

async fn send(
&self,
resource: &Self::Resource,
name: String,
data: Vec<u8>,
) -> Result<(), AnyError> {
let name = Arc::new(name);
let data = Arc::new(data);
let uuid = resource.uuid;
self.0.lock().unwrap().send(Message { name, data, uuid })?;
Ok(())
}

async fn recv(
&self,
resource: &Self::Resource,
) -> Result<Option<crate::Message>, AnyError> {
let mut g = resource.rx.lock().await;
let (broadcast_rx, cancel_rx) = &mut *g;
loop {
let result = tokio::select! {
r = broadcast_rx.recv() => r,
_ = cancel_rx.recv() => return Ok(None),
};
use tokio::sync::broadcast::error::RecvError::*;
match result {
Ok(message) if message.uuid == resource.uuid => (), // Self-send.
Ok(Message { name, data, .. }) => {
let name = String::clone(&*name);
let data = Vec::clone(&*data);
return Ok(Some((name, data)));
}
Err(Closed) => return Ok(None),
Err(Lagged(_)) => (), // Backlogged, messages dropped.
}
}
}
}

impl deno_core::Resource for InMemoryBroadcastChannelResource {}
Loading

0 comments on commit 7735044

Please sign in to comment.