From b2479307ce2889f8f80958924a385b2d3aa840ee Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 28 Sep 2024 08:43:54 +0300 Subject: [PATCH] Bring back RTP and RTCP notifications --- Cargo.lock | 4 ++-- rust/src/router/consumer.rs | 21 ++++++++++++++------- rust/src/router/direct_transport.rs | 28 +++++++++++++--------------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa2c9cddaf..5cf4281fc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,7 +1354,7 @@ checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909" [[package]] name = "mediasoup" -version = "0.17.0" +version = "0.17.1" dependencies = [ "actix", "actix-web", @@ -1391,7 +1391,7 @@ dependencies = [ [[package]] name = "mediasoup-sys" -version = "0.9.0" +version = "0.9.1" dependencies = [ "planus", "planus-codegen", diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index 8d58811aea..8fa10b6420 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -593,8 +593,7 @@ enum Notification { ProducerClose, ProducerPause, ProducerResume, - // TODO. - // Rtp, + Rtp(Vec), Score(ConsumerScore), LayersChange(Option), Trace(ConsumerTraceEventData), @@ -608,6 +607,17 @@ impl Notification { notification::Event::ConsumerProducerClose => Ok(Notification::ProducerClose), notification::Event::ConsumerProducerPause => Ok(Notification::ProducerPause), notification::Event::ConsumerProducerResume => Ok(Notification::ProducerResume), + notification::Event::ConsumerRtp => { + let Ok(Some(notification::BodyRef::ConsumerRtpNotification(body))) = + notification.body() + else { + panic!("Wrong message from worker: {notification:?}"); + }; + + let rtp_notification_fbs = consumer::RtpNotification::try_from(body).unwrap(); + + Ok(Notification::Rtp(rtp_notification_fbs.data)) + } notification::Event::ConsumerScore => { let Ok(Some(notification::BodyRef::ConsumerScoreNotification(body))) = notification.body() @@ -841,14 +851,11 @@ impl Consumer { handlers.resume.call_simple(); } } - /* - * TODO. - Notification::Rtp => { + Notification::Rtp(data) => { handlers.rtp.call(|callback| { - callback(notification); + callback(&data); }); } - */ Notification::Score(consumer_score) => { *score.lock() = consumer_score.clone(); handlers.score.call_simple(&consumer_score); diff --git a/rust/src/router/direct_transport.rs b/rust/src/router/direct_transport.rs index 38a6857cf0..8222500022 100644 --- a/rust/src/router/direct_transport.rs +++ b/rust/src/router/direct_transport.rs @@ -231,8 +231,7 @@ struct Handlers { #[serde(tag = "event", rename_all = "lowercase", content = "data")] enum Notification { Trace(TransportTraceEventData), - // TODO. - // Rtcp, + Rtcp(Vec), } impl Notification { @@ -252,17 +251,18 @@ impl Notification { Ok(Notification::Trace(trace_notification)) } - /* - * TODO. notification::Event::DirecttransportRtcp => { - let Ok(Some(notification::BodyRef::RtcpNotification(_body))) = notification.body() + let Ok(Some(notification::BodyRef::DirectTransportRtcpNotification(body))) = + notification.body() else { panic!("Wrong message from worker: {notification:?}"); }; - Ok(Notification::Rtcp) + let rtcp_notification_fbs = + direct_transport::RtcpNotification::try_from(body).unwrap(); + + Ok(Notification::Rtcp(rtcp_notification_fbs.data)) } - */ _ => Err(NotificationParseError::InvalidEvent), } } @@ -574,14 +574,12 @@ impl DirectTransport { Ok(notification) => match notification { Notification::Trace(trace_event_data) => { handlers.trace.call_simple(&trace_event_data); - } /* - * TODO. - Notification::Rtcp => { - handlers.rtcp.call(|callback| { - callback(notification); - }); - } - */ + } + Notification::Rtcp(data) => { + handlers.rtcp.call(|callback| { + callback(&data); + }); + } }, Err(error) => { error!("Failed to parse notification: {}", error);