-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(extensions): BroadcastChannel WPT conformance
Replaces the file-backed provider by an in-memory one because proper file locking is a hard problem that detracts from the proof of concept. Teach the WPT runner how to extract tests from .html files because all the relevant tests in test_util/wpt/webmessaging/broadcastchannel are inside basics.html and interface.html.
- Loading branch information
1 parent
422a9ab
commit c71d60a
Showing
15 changed files
with
292 additions
and
110 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 97 additions & 0 deletions
97
extensions/broadcast_channel/in_memory_broadcast_channel.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. | ||
|
||
use crate::BroadcastChannel; | ||
use async_trait::async_trait; | ||
use deno_core::error::AnyError; | ||
use std::sync::Arc; | ||
use std::sync::Mutex; | ||
use tokio::sync::broadcast; | ||
use tokio::sync::mpsc; | ||
use uuid::Uuid; | ||
|
||
#[derive(Clone)] | ||
pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>); | ||
|
||
pub struct InMemoryBroadcastChannelResource { | ||
rx: tokio::sync::Mutex<( | ||
broadcast::Receiver<Message>, | ||
mpsc::UnboundedReceiver<()>, | ||
)>, | ||
cancel_tx: mpsc::UnboundedSender<()>, | ||
uuid: Uuid, | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
struct Message { | ||
name: Arc<String>, | ||
data: Arc<Vec<u8>>, | ||
uuid: Uuid, | ||
} | ||
|
||
impl Default for InMemoryBroadcastChannel { | ||
fn default() -> Self { | ||
let (tx, _) = broadcast::channel(256); | ||
Self(Arc::new(Mutex::new(tx))) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl BroadcastChannel for InMemoryBroadcastChannel { | ||
type Resource = InMemoryBroadcastChannelResource; | ||
|
||
fn subscribe(&self) -> Result<Self::Resource, AnyError> { | ||
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); | ||
let broadcast_rx = self.0.lock().unwrap().subscribe(); | ||
let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); | ||
let uuid = Uuid::new_v4(); | ||
Ok(Self::Resource { | ||
rx, | ||
cancel_tx, | ||
uuid, | ||
}) | ||
} | ||
|
||
fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { | ||
Ok(resource.cancel_tx.send(())?) | ||
} | ||
|
||
async fn send( | ||
&self, | ||
resource: &Self::Resource, | ||
name: String, | ||
data: Vec<u8>, | ||
) -> Result<(), AnyError> { | ||
let name = Arc::new(name); | ||
let data = Arc::new(data); | ||
let uuid = resource.uuid; | ||
self.0.lock().unwrap().send(Message { name, data, uuid })?; | ||
Ok(()) | ||
} | ||
|
||
async fn recv( | ||
&self, | ||
resource: &Self::Resource, | ||
) -> Result<Option<crate::Message>, AnyError> { | ||
let mut g = resource.rx.lock().await; | ||
let (broadcast_rx, cancel_rx) = &mut *g; | ||
loop { | ||
let result = tokio::select! { | ||
r = broadcast_rx.recv() => r, | ||
_ = cancel_rx.recv() => return Ok(None), | ||
}; | ||
use tokio::sync::broadcast::error::RecvError::*; | ||
match result { | ||
Ok(message) if message.uuid == resource.uuid => (), // Self-send. | ||
Ok(Message { name, data, .. }) => { | ||
let name = String::clone(&*name); | ||
let data = Vec::clone(&*data); | ||
return Ok(Some((name, data))); | ||
} | ||
Err(Closed) => return Ok(None), | ||
Err(Lagged(_)) => (), // Backlogged, messages dropped. | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl deno_core::Resource for InMemoryBroadcastChannelResource {} |
Oops, something went wrong.