Skip to content

Commit

Permalink
It compiles (TM), more work to do
Browse files Browse the repository at this point in the history
  • Loading branch information
DougAnderson444 committed Jul 24, 2023
1 parent 1790f01 commit 5d34fd3
Show file tree
Hide file tree
Showing 8 changed files with 768 additions and 2 deletions.
50 changes: 50 additions & 0 deletions transports/webrtc-websys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "libp2p-webrtc-websys"
version = "0.1.0"
edition = "2021"
rust-version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.28"
callback-future = "0.1.0" # because web_sys::Rtc is callback based, but we need futures
js-sys = "0.3.64"
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true }
libp2p-noise = { workspace = true }
log = "0.4.19"
hex = "0.4"
getrandom = { version = "0.2.9", features = ["js"] }
multiaddr = { workspace = true }
multihash = { workspace = true }
regex = "1.9.1"
send_wrapper = { version = "0.6.0", features = ["futures"] }
thiserror = "1.0.43"
serde_json = "1.0.103"
sha2 = "0.10.7"
serde = { version = "1.0", features = ["derive"] }
tinytemplate = "1.2.1" # used for the SDP creation
wasm-bindgen = "0.2.87"
wasm-bindgen-futures = "0.4.37"
web-sys = { version = "0.3.64", features = [
"MessageEvent",
"RtcPeerConnection",
"RtcSignalingState",
"RtcSdpType",
"RtcSessionDescription",
"RtcSessionDescriptionInit",
"RtcPeerConnectionIceEvent",
"RtcIceCandidate",
"RtcDataChannel",
"RtcDataChannelEvent",
"RtcCertificate",
"RtcConfiguration",
"RtcDataChannelInit",
"RtcDataChannelType",
"RtcDataChannelState",
] }

[dev-dependencies]
anyhow = "1.0"
hex-literal = "0.4"
56 changes: 56 additions & 0 deletions transports/webrtc-websys/src/cbfutures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

#[derive(Clone, Debug)]
pub struct CbFuture<T>(Rc<CallbackFutureInner<T>>);

struct CallbackFutureInner<T> {
waker: Cell<Option<Waker>>,
result: Cell<Option<T>>,
}

impl<T> std::fmt::Debug for CallbackFutureInner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CallbackFutureInner").finish()
}
}

impl<T> Default for CallbackFutureInner<T> {
fn default() -> Self {
Self {
waker: Cell::new(None),
result: Cell::new(None),
}
}
}

impl<T> CbFuture<T> {
/// New Callback Future
pub fn new() -> Self {
Self(Rc::new(CallbackFutureInner::<T>::default()))
}

// call this from your callback
pub fn publish(&self, result: T) {
self.0.result.set(Some(result));
if let Some(w) = self.0.waker.take() {
w.wake()
};
}
}

impl<T> Future for CbFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.0.result.take() {
Some(x) => Poll::Ready(x),
None => {
self.0.waker.set(Some(cx.waker().clone()));
Poll::Pending
}
}
}
}
218 changes: 218 additions & 0 deletions transports/webrtc-websys/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
//! Websys WebRTC Connection
//!
use crate::cbfutures::CbFuture;
use crate::fingerprint::Fingerprint;
use crate::stream::DataChannelConfig;
use crate::upgrade::{self};
use crate::utils;
use crate::{Error, WebRTCStream};
use futures::FutureExt;
use js_sys::Object;
use js_sys::Reflect;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_identity::{Keypair, PeerId};
use send_wrapper::SendWrapper;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use wasm_bindgen_futures::JsFuture;
use web_sys::{RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcPeerConnection};

pub const SHA2_256: u64 = 0x12;
pub const SHA2_512: u64 = 0x13;

pub struct Connection {
// Swarm needs all types to be Send. WASM is single-threaded
// and it is safe to use SendWrapper.
inner: SendWrapper<ConnectionInner>,
}

struct ConnectionInner {
peer_connection: Option<RtcPeerConnection>,
sock_addr: SocketAddr,
remote_fingerprint: Fingerprint,
id_keys: Keypair,
create_data_channel_cbfuture: CbFuture<RtcDataChannel>,
closed: bool,
}

impl Connection {
/// Create a new Connection
pub fn new(sock_addr: SocketAddr, remote_fingerprint: Fingerprint, id_keys: Keypair) -> Self {
Self {
inner: SendWrapper::new(ConnectionInner::new(sock_addr, remote_fingerprint, id_keys)),
}
}

/// Connect
pub async fn connect(&mut self) -> Result<PeerId, Error> {
let fut = SendWrapper::new(self.inner.connect());
fut.await
}

/// Peer Connection Getter
pub fn peer_connection(&self) -> Option<&RtcPeerConnection> {
self.inner.peer_connection.as_ref()
}
}

impl ConnectionInner {
pub fn new(sock_addr: SocketAddr, remote_fingerprint: Fingerprint, id_keys: Keypair) -> Self {
Self {
peer_connection: None,
sock_addr,
remote_fingerprint,
id_keys,
create_data_channel_cbfuture: CbFuture::new(),
closed: false,
}
}

pub async fn connect(&mut self) -> Result<PeerId, Error> {
let hash = match self.remote_fingerprint.to_multihash().code() {
SHA2_256 => "sha-256",
SHA2_512 => "sha2-512",
_ => return Err(Error::JsError("unsupported hash".to_string())),
};

// let keygen_algorithm = json!({
// "name": "ECDSA",
// "namedCurve": "P-256",
// "hash": hash
// });

let algo: js_sys::Object = Object::new();
Reflect::set(&algo, &"name".into(), &"ECDSA".into()).unwrap();
Reflect::set(&algo, &"namedCurve".into(), &"P-256".into()).unwrap();
Reflect::set(&algo, &"hash".into(), &hash.into()).unwrap();

let certificate_promise = RtcPeerConnection::generate_certificate_with_object(&algo)
.expect("certificate to be valid");

let certificate = JsFuture::from(certificate_promise).await?; // Needs to be Send

let mut config = RtcConfiguration::new();
config.certificates(&certificate);

let peer_connection = web_sys::RtcPeerConnection::new_with_configuration(&config)?;

let ufrag = format!("libp2p+webrtc+v1/{}", utils::gen_ufrag(32));
/*
* OFFER
*/
let offer = JsFuture::from(peer_connection.create_offer()).await?; // Needs to be Send
let offer_obj = crate::sdp::offer(offer, &ufrag);
let sld_promise = peer_connection.set_local_description(&offer_obj);
JsFuture::from(sld_promise).await?;

/*
* ANSWER
*/
let answer_obj = crate::sdp::answer(self.sock_addr, &self.remote_fingerprint, &ufrag);
let srd_promise = peer_connection.set_remote_description(&answer_obj);
JsFuture::from(srd_promise).await?;

let peer_id = upgrade::outbound(
&peer_connection,
self.id_keys.clone(),
self.remote_fingerprint,
)
.await?;

self.peer_connection = Some(peer_connection);
Ok(peer_id)
}

/// Initiates and polls a future from `create_data_channel`.
fn poll_create_data_channel(
&mut self,
cx: &mut Context,
config: DataChannelConfig,
) -> Poll<Result<WebRTCStream, Error>> {
// Create Data Channel
// take the peer_connection and DataChannelConfig and create a pollable future
let mut dc =
crate::stream::create_data_channel(&self.peer_connection.as_ref().unwrap(), config);

let val = ready!(dc.poll_unpin(cx));

let channel = WebRTCStream::new(val);

Poll::Ready(Ok(channel))
}

/// Polls Incoming Peer Connections? Or Data Channels?
pub fn poll_incoming(&mut self, cx: &mut Context) -> Poll<Result<WebRTCStream, Error>> {
let mut dc = crate::stream::create_data_channel(
&self.peer_connection.as_ref().unwrap(),
DataChannelConfig::default(),
);

let val = ready!(dc.poll_unpin(cx));

let channel = WebRTCStream::new(val);

Poll::Ready(Ok(channel))
}

/// Closes the Peer Connection.
///
/// This closes the data channels also and they will return an error
/// if they are used.
fn close_connection(&mut self) {
match (&self.peer_connection, self.closed) {
(Some(conn), false) => {
conn.close();
self.closed = true;
}
_ => (),
}
}
}

impl Drop for ConnectionInner {
fn drop(&mut self) {
self.close_connection();
}
}

/// WebRTC native multiplexing
/// Allows users to open substreams
impl StreamMuxer for Connection {
type Substream = WebRTCStream; // A Substream of a WebRTC PeerConnection is a Data Channel
type Error = Error;

fn poll_inbound(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
// Inbound substreams for Browser WebRTC?
// Can only be done through a relayed connection
self.inner.poll_incoming(cx)
}

fn poll_outbound(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
// Since this is not a initial handshake outbound request (ie. Dialer)
// we need to create a new Data Channel without negotiated flag set to true
let config = DataChannelConfig::default();
self.inner.poll_create_data_channel(cx, config)
}

fn poll_close(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.inner.close_connection();
Poll::Ready(Ok(()))
}

fn poll(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
}
8 changes: 7 additions & 1 deletion transports/webrtc-websys/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ impl Error {
}
}

// implement `std::convert::From<wasm_bindgen::JsValue>` for `error::Error`
impl std::convert::From<wasm_bindgen::JsValue> for Error {
fn from(value: JsValue) -> Self {
Error::from_js_value(value)
}
}

// impl From String
impl From<String> for Error {
fn from(value: String) -> Self {
Error::JsError(value)
}
}
Loading

0 comments on commit 5d34fd3

Please sign in to comment.