Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving callbacks to into traits #525

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions ;
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tokio::sync::Mutex;
use arc_swap::ArcSwapOption;

pub struct EventHandler<T: ?Sized> {
inner: ArcSwapOption<Mutex<Box<T>>>,
}

impl <T: ?Sized> EventHandler<T> {
fn empty() -> Self {
Self { inner: ArcSwapOption::empty() }
}

async fn load(&self) -> Option<&T> {

let guard = self.inner.load();
let handler = guard.as_ref()?.lock().await;
//Some(&*handler)
}
}

impl <T: ?Sized> Default for EventHandler<T> {
fn default() -> Self {
Self::empty()
}
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 103 additions & 66 deletions examples/examples/broadcast/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::io::Write;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use anyhow::Result;
use clap::{AppSettings, Arg, Command};
use std::future::Future;
use tokio::time::Duration;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
Expand All @@ -12,12 +13,94 @@ use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::{PeerConnectionEventHandler, RTCPeerConnection};
use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::Error;

use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
use webrtc::rtp_transceiver::RTCRtpTransceiver;
use webrtc::track::track_remote::TrackRemote;

struct ConnectionHandler {
connection: Weak<RTCPeerConnection>,
local_track_chan_tx: Arc<tokio::sync::mpsc::Sender<Arc<TrackLocalStaticRTP>>>,
}

impl PeerConnectionEventHandler for ConnectionHandler {
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
// replaces the SSRC and sends them back
fn on_track(
&mut self,
track: Arc<TrackRemote>,
_: Arc<RTCRtpReceiver>,
_: Arc<RTCRtpTransceiver>,
) -> impl Future<Output = ()> + Send {
async move {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let connection = self.connection.clone();
let media_ssrc = track.ssrc();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = connection.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
};
}
});

let local_track_chan_tx = self.local_track_chan_tx.clone();
tokio::spawn(async move {
// Create Track that we send video back to browser on
let local_track = Arc::new(TrackLocalStaticRTP::new(
track.codec().capability,
"video".to_owned(),
"webrtc-rs".to_owned(),
));
let _ = local_track_chan_tx.send(local_track.clone()).await;

// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = local_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
print!("output track write_rtp got error: {err} and break");
break;
} else {
print!("output track write_rtp got error: {err}");
}
}
}
});
}
}

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
fn on_peer_connection_state_change(
&mut self,
state: RTCPeerConnectionState,
) -> impl Future<Output = ()> + Send {
async move {
println!("Peer Connection State has changed: {state}");
}
}
}

#[tokio::main]
async fn main() -> Result<()> {
let mut app = Command::new("broadcast")
Expand Down Expand Up @@ -126,64 +209,11 @@ async fn main() -> Result<()> {
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
// replaces the SSRC and sends them back
let pc = Arc::downgrade(&peer_connection);
peer_connection.on_track(Box::new(move |track, _, _| {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let media_ssrc = track.ssrc();
let pc2 = pc.clone();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = pc2.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
};
}
});

let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
tokio::spawn(async move {
// Create Track that we send video back to browser on
let local_track = Arc::new(TrackLocalStaticRTP::new(
track.codec().capability,
"video".to_owned(),
"webrtc-rs".to_owned(),
));
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;

// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = local_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
print!("output track write_rtp got error: {err} and break");
break;
} else {
print!("output track write_rtp got error: {err}");
}
}
}
});

Box::pin(async {})
}));

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {s}");
Box::pin(async {})
}));
peer_connection.with_event_handler(ConnectionHandler {
connection: pc,
local_track_chan_tx,
});

// Set the remote SessionDescription
peer_connection.set_remote_description(offer).await?;
Expand Down Expand Up @@ -264,14 +294,21 @@ async fn main() -> Result<()> {
Result::<()>::Ok(())
});

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(
move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {s}");
Box::pin(async {})
},
));
struct StateNotifier;

impl PeerConnectionEventHandler for StateNotifier {
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
fn on_peer_connection_state_change(
&mut self,
state: RTCPeerConnectionState,
) -> impl Future<Output = ()> + Send {
async move {
println!("Peer Connection State has changed: {state}");
}
}
}
peer_connection.with_event_handler(StateNotifier);

// Set the remote SessionDescription
peer_connection
Expand Down
Loading
Loading