From 84e4fe7f59df6dd044bd9dbcbdf4621910bb4717 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 25 Feb 2021 11:31:32 +0200 Subject: [PATCH 1/4] Add new timestamp mode based on the receive time and timecode In addition to the old one based on the receive time and timestamp. Also make that new mode the default as it will usually give more accurate results because the timestamp is just the send time while the timecode is usually set by the sender based on the media timestamps. --- src/lib.rs | 10 ++++++---- src/ndiaudiosrc.rs | 4 ++-- src/ndivideosrc.rs | 4 ++-- src/receiver.rs | 7 ++++++- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0a665d1..8d201a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,12 +20,14 @@ use once_cell::sync::Lazy; #[repr(u32)] #[genum(type_name = "GstNdiTimestampMode")] pub enum TimestampMode { - #[genum(name = "Receive Time", nick = "receive-time")] - ReceiveTime = 0, + #[genum(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")] + ReceiveTimeTimecode = 0, + #[genum(name = "Receive Time / Timestamp", nick = "receive-time-vs-timestamp")] + ReceiveTimeTimestamp = 1, #[genum(name = "NDI Timecode", nick = "timecode")] - Timecode = 1, + Timecode = 2, #[genum(name = "NDI Timestamp", nick = "timestamp")] - Timestamp = 2, + Timestamp = 3, } fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index 4a05f44..c4a0ca0 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -39,7 +39,7 @@ impl Default for Settings { connect_timeout: 10000, timeout: 5000, bandwidth: ndisys::NDIlib_recv_bandwidth_highest, - timestamp_mode: TimestampMode::ReceiveTime, + timestamp_mode: TimestampMode::ReceiveTimeTimecode, } } } @@ -111,7 +111,7 @@ static PROPERTIES: [subclass::Property; 7] = [ "Timestamp Mode", "Timestamp information to use for outgoing PTS", TimestampMode::static_type(), - TimestampMode::ReceiveTime as i32, + TimestampMode::ReceiveTimeTimecode as i32, glib::ParamFlags::READWRITE, ) }), diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index ba358bb..e73204c 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -40,7 +40,7 @@ impl Default for Settings { connect_timeout: 10000, timeout: 5000, bandwidth: ndisys::NDIlib_recv_bandwidth_highest, - timestamp_mode: TimestampMode::ReceiveTime, + timestamp_mode: TimestampMode::ReceiveTimeTimecode, } } } @@ -112,7 +112,7 @@ static PROPERTIES: [subclass::Property; 7] = [ "Timestamp Mode", "Timestamp information to use for outgoing PTS", TimestampMode::static_type(), - TimestampMode::ReceiveTime as i32, + TimestampMode::ReceiveTimeTimecode as i32, glib::ParamFlags::READWRITE, ) }), diff --git a/src/receiver.rs b/src/receiver.rs index b229962..d88e95f 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -812,7 +812,12 @@ impl Receiver { ); let (pts, duration) = match self.0.timestamp_mode { - TimestampMode::ReceiveTime => self.0.observations.process( + TimestampMode::ReceiveTimeTimecode => { + self.0 + .observations + .process(self.0.cat, element, (timecode, receive_time), duration) + } + TimestampMode::ReceiveTimeTimestamp => self.0.observations.process( self.0.cat, element, (timestamp, receive_time), From 83962cbb8ca2f69904cbfca7d60f35634b1d8745 Mon Sep 17 00:00:00 2001 From: Luke Moscrop Date: Tue, 23 Feb 2021 13:19:31 +0200 Subject: [PATCH 2/4] Add FFI bindings for NDIlib_send_instance_t and related functions --- src/ndisys.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/ndisys.rs b/src/ndisys.rs index ddadd43..f1c9913 100644 --- a/src/ndisys.rs +++ b/src/ndisys.rs @@ -62,6 +62,20 @@ extern "C" { p_instance: NDIlib_recv_instance_t, p_total: *mut NDIlib_recv_queue_t, ); + pub fn NDIlib_send_create( + p_create_settings: *const NDIlib_send_create_t + ) -> NDIlib_send_instance_t; + pub fn NDIlib_send_destroy( + p_instance: NDIlib_send_instance_t + ); + pub fn NDIlib_send_send_video_v2( + p_instance: NDIlib_send_instance_t, + p_video_data: *const NDIlib_video_frame_v2_t + ); + pub fn NDIlib_send_send_audio_v2( + p_instance: NDIlib_send_instance_t, + p_audio_data: *const NDIlib_audio_frame_v2_t + ); } pub type NDIlib_find_instance_t = *mut ::std::os::raw::c_void; @@ -147,6 +161,17 @@ pub struct NDIlib_recv_create_v3_t { pub type NDIlib_recv_instance_t = *mut ::std::os::raw::c_void; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct NDIlib_send_create_t { + pub p_ndi_name: *const ::std::os::raw::c_char, + pub p_groups: *const ::std::os::raw::c_char, + pub clock_video: bool, + pub clock_audio: bool, +} + +pub type NDIlib_send_instance_t = *mut ::std::os::raw::c_void; + #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct NDIlib_tally_t { From e7b3b87757e0337d24afeb56be7ba29612b6986b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 23 Feb 2021 15:26:58 +0200 Subject: [PATCH 3/4] Add safe bindings for the send API Including creation of frames from GStreamer buffers. --- src/ndi.rs | 340 +++++++++++++++++++++++++++++++++++++++++++++----- src/ndisys.rs | 10 +- 2 files changed, 313 insertions(+), 37 deletions(-) diff --git a/src/ndi.rs b/src/ndi.rs index 1ff5449..6c963ef 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -5,6 +5,8 @@ use std::mem; use std::ptr; use std::sync::{Arc, Mutex}; +use byte_slice_cast::*; + pub fn initialize() -> bool { unsafe { NDIlib_initialize() } } @@ -339,11 +341,17 @@ impl RecvInstance { match res { NDIlib_frame_type_e::NDIlib_frame_type_audio => { assert!(audio); - Ok(Some(Frame::Audio(AudioFrame::Borrowed(audio_frame, self)))) + Ok(Some(Frame::Audio(AudioFrame::BorrowedRecv( + audio_frame, + self, + )))) } NDIlib_frame_type_e::NDIlib_frame_type_video => { assert!(video); - Ok(Some(Frame::Video(VideoFrame::Borrowed(video_frame, self)))) + Ok(Some(Frame::Video(VideoFrame::BorrowedRecv( + video_frame, + self, + )))) } NDIlib_frame_type_e::NDIlib_frame_type_metadata => { assert!(metadata); @@ -365,6 +373,80 @@ impl Drop for RecvInstanceInner { } } +#[derive(Debug)] +pub struct SendBuilder<'a> { + ndi_name: &'a str, + clock_audio: bool, + clock_video: bool, +} + +impl<'a> SendBuilder<'a> { + pub fn clock_audio(self) -> Self { + Self { + clock_audio: true, + ..self + } + } + + pub fn clock_video(self) -> Self { + Self { + clock_video: true, + ..self + } + } + + pub fn build(self) -> Option { + unsafe { + let ndi_name = ffi::CString::new(self.ndi_name).unwrap(); + let ptr = NDIlib_send_create(&NDIlib_send_create_t { + p_ndi_name: ndi_name.as_ptr(), + clock_video: self.clock_video, + clock_audio: self.clock_audio, + p_groups: ptr::null(), + }); + + if ptr.is_null() { + None + } else { + Some(SendInstance(ptr::NonNull::new_unchecked(ptr))) + } + } + } +} + +#[derive(Debug)] +pub struct SendInstance(ptr::NonNull<::std::os::raw::c_void>); + +unsafe impl Send for SendInstance {} + +impl SendInstance { + pub fn builder<'a>(ndi_name: &'a str) -> SendBuilder<'a> { + SendBuilder { + ndi_name, + clock_video: false, + clock_audio: false, + } + } + + pub fn send_video(&mut self, frame: &VideoFrame) { + unsafe { + NDIlib_send_send_video_v2(self.0.as_ptr(), frame.as_ptr()); + } + } + + pub fn send_audio(&mut self, frame: &AudioFrame) { + unsafe { + NDIlib_send_send_audio_v2(self.0.as_ptr(), frame.as_ptr()); + } + } +} + +impl Drop for SendInstance { + fn drop(&mut self) { + unsafe { NDIlib_send_destroy(self.0.as_ptr() as *mut _) } + } +} + #[derive(Debug)] pub struct Tally(NDIlib_tally_t); unsafe impl Send for Tally {} @@ -405,49 +487,67 @@ pub enum Frame<'a> { #[derive(Debug)] pub enum VideoFrame<'a> { //Owned(NDIlib_video_frame_v2_t, Option, Option>), - Borrowed(NDIlib_video_frame_v2_t, &'a RecvInstance), + BorrowedRecv(NDIlib_video_frame_v2_t, &'a RecvInstance), + BorrowedGst( + NDIlib_video_frame_v2_t, + &'a gst_video::VideoFrameRef<&'a gst::BufferRef>, + ), } impl<'a> VideoFrame<'a> { pub fn xres(&self) -> i32 { match self { - VideoFrame::Borrowed(ref frame, _) => frame.xres, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.xres + } } } pub fn yres(&self) -> i32 { match self { - VideoFrame::Borrowed(ref frame, _) => frame.yres, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.yres + } } } pub fn fourcc(&self) -> NDIlib_FourCC_video_type_e { match self { - VideoFrame::Borrowed(ref frame, _) => frame.FourCC, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.FourCC + } } } pub fn frame_rate(&self) -> (i32, i32) { match self { - VideoFrame::Borrowed(ref frame, _) => (frame.frame_rate_N, frame.frame_rate_D), + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + (frame.frame_rate_N, frame.frame_rate_D) + } } } pub fn picture_aspect_ratio(&self) -> f32 { match self { - VideoFrame::Borrowed(ref frame, _) => frame.picture_aspect_ratio, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.picture_aspect_ratio + } } } pub fn frame_format_type(&self) -> NDIlib_frame_format_type_e { match self { - VideoFrame::Borrowed(ref frame, _) => frame.frame_format_type, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.frame_format_type + } } } pub fn timecode(&self) -> i64 { match self { - VideoFrame::Borrowed(ref frame, _) => frame.timecode, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.timecode + } } } @@ -467,7 +567,7 @@ impl<'a> VideoFrame<'a> { unsafe { use std::slice; match self { - VideoFrame::Borrowed(ref frame, _) => { + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { slice::from_raw_parts(frame.p_data as *const u8, frame_size as usize) } } @@ -476,7 +576,7 @@ impl<'a> VideoFrame<'a> { pub fn line_stride_or_data_size_in_bytes(&self) -> i32 { match self { - VideoFrame::Borrowed(ref frame, _) => { + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { let stride = frame.line_stride_or_data_size_in_bytes; if stride != 0 { @@ -506,7 +606,7 @@ impl<'a> VideoFrame<'a> { pub fn metadata(&self) -> Option<&str> { unsafe { match self { - VideoFrame::Borrowed(ref frame, _) => { + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { if frame.p_metadata.is_null() { None } else { @@ -519,21 +619,137 @@ impl<'a> VideoFrame<'a> { pub fn timestamp(&self) -> i64 { match self { - VideoFrame::Borrowed(ref frame, _) => frame.timestamp, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => { + frame.timestamp + } } } pub fn as_ptr(&self) -> *const NDIlib_video_frame_v2_t { match self { - VideoFrame::Borrowed(ref frame, _) => frame, + VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => frame, } } + + pub fn try_from_video_frame( + frame: &'a gst_video::VideoFrameRef<&'a gst::BufferRef>, + timecode: i64, + ) -> Result { + // Planar formats must be in contiguous memory + let format = match frame.format() { + gst_video::VideoFormat::Uyvy => ndisys::NDIlib_FourCC_video_type_UYVY, + gst_video::VideoFormat::I420 => { + if (frame.plane_data(1).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize) + != Some(frame.height() as usize * frame.plane_stride()[0] as usize) + { + return Err(()); + } + + if (frame.plane_data(2).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(1).unwrap().as_ptr() as usize) + != Some((frame.height() as usize + 1) / 2 * frame.plane_stride()[1] as usize) + { + return Err(()); + } + + ndisys::NDIlib_FourCC_video_type_I420 + } + gst_video::VideoFormat::Nv12 => { + if (frame.plane_data(1).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize) + != Some(frame.height() as usize * frame.plane_stride()[0] as usize) + { + return Err(()); + } + + ndisys::NDIlib_FourCC_video_type_NV12 + } + gst_video::VideoFormat::Nv21 => { + if (frame.plane_data(1).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize) + != Some(frame.height() as usize * frame.plane_stride()[0] as usize) + { + return Err(()); + } + + ndisys::NDIlib_FourCC_video_type_NV12 + } + gst_video::VideoFormat::Yv12 => { + if (frame.plane_data(1).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize) + != Some(frame.height() as usize * frame.plane_stride()[0] as usize) + { + return Err(()); + } + + if (frame.plane_data(2).unwrap().as_ptr() as usize) + .checked_sub(frame.plane_data(1).unwrap().as_ptr() as usize) + != Some((frame.height() as usize + 1) / 2 * frame.plane_stride()[1] as usize) + { + return Err(()); + } + + ndisys::NDIlib_FourCC_video_type_YV12 + } + gst_video::VideoFormat::Bgra => ndisys::NDIlib_FourCC_video_type_BGRA, + gst_video::VideoFormat::Bgrx => ndisys::NDIlib_FourCC_video_type_BGRX, + gst_video::VideoFormat::Rgba => ndisys::NDIlib_FourCC_video_type_RGBA, + gst_video::VideoFormat::Rgbx => ndisys::NDIlib_FourCC_video_type_RGBX, + _ => return Err(()), + }; + + let frame_format_type = match frame.info().interlace_mode() { + gst_video::VideoInterlaceMode::Progressive => { + NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive + } + gst_video::VideoInterlaceMode::Interleaved => { + NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved + } + // FIXME: Is this correct? + #[cfg(feature = "interlaced-fields")] + gst_video::VideoInterlaceMode::Alternate + if frame.flags().contains(gst_video::VideoFrameFlags::TFF) => + { + NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 + } + #[cfg(feature = "interlaced-fields")] + gst_video::VideoInterlaceMode::Alternate + if !frame.flags().contains(gst_video::VideoFrameFlags::TFF) => + { + NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 + } + _ => return Err(()), + }; + + let picture_aspect_ratio = + frame.info().par() * gst::Fraction::new(frame.width() as i32, frame.height() as i32); + let picture_aspect_ratio = + *picture_aspect_ratio.numer() as f32 / *picture_aspect_ratio.denom() as f32; + + let ndi_frame = NDIlib_video_frame_v2_t { + xres: frame.width() as i32, + yres: frame.height() as i32, + FourCC: format, + frame_rate_N: *frame.info().fps().numer(), + frame_rate_D: *frame.info().fps().denom(), + picture_aspect_ratio, + frame_format_type, + timecode, + p_data: frame.plane_data(0).unwrap().as_ptr() as *const i8, + line_stride_or_data_size_in_bytes: frame.plane_stride()[0], + p_metadata: ptr::null(), + timestamp: 0, + }; + + Ok(VideoFrame::BorrowedGst(ndi_frame, frame)) + } } impl<'a> Drop for VideoFrame<'a> { #[allow(irrefutable_let_patterns)] fn drop(&mut self) { - if let VideoFrame::Borrowed(ref mut frame, ref recv) = *self { + if let VideoFrame::BorrowedRecv(ref mut frame, ref recv) = *self { unsafe { NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame); } @@ -543,32 +759,44 @@ impl<'a> Drop for VideoFrame<'a> { #[derive(Debug)] pub enum AudioFrame<'a> { - //Owned(NDIlib_audio_frame_v2_t, Option, Option>), - Borrowed(NDIlib_audio_frame_v2_t, &'a RecvInstance), + Owned( + NDIlib_audio_frame_v2_t, + Option, + Option>, + ), + BorrowedRecv(NDIlib_audio_frame_v2_t, &'a RecvInstance), } impl<'a> AudioFrame<'a> { pub fn sample_rate(&self) -> i32 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.sample_rate, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.sample_rate + } } } pub fn no_channels(&self) -> i32 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.no_channels, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.no_channels + } } } pub fn no_samples(&self) -> i32 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.no_samples, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.no_samples + } } } pub fn timecode(&self) -> i64 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.timecode, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.timecode + } } } @@ -576,24 +804,28 @@ impl<'a> AudioFrame<'a> { unsafe { use std::slice; match self { - AudioFrame::Borrowed(ref frame, _) => slice::from_raw_parts( - frame.p_data as *const u8, - (frame.no_samples * frame.channel_stride_in_bytes) as usize, - ), + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + slice::from_raw_parts( + frame.p_data as *const u8, + (frame.no_samples * frame.channel_stride_in_bytes) as usize, + ) + } } } } pub fn channel_stride_in_bytes(&self) -> i32 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.channel_stride_in_bytes, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.channel_stride_in_bytes + } } } pub fn metadata(&self) -> Option<&str> { unsafe { match self { - AudioFrame::Borrowed(ref frame, _) => { + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { if frame.p_metadata.is_null() { None } else { @@ -606,13 +838,15 @@ impl<'a> AudioFrame<'a> { pub fn timestamp(&self) -> i64 { match self { - AudioFrame::Borrowed(ref frame, _) => frame.timestamp, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => { + frame.timestamp + } } } pub fn as_ptr(&self) -> *const NDIlib_audio_frame_v2_t { match self { - AudioFrame::Borrowed(ref frame, _) => frame, + AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => frame, } } @@ -635,12 +869,56 @@ impl<'a> AudioFrame<'a> { NDIlib_util_audio_to_interleaved_16s_v2(self.as_ptr(), &mut dst); } } + + pub fn try_from_interleaved_16s( + info: &gst_audio::AudioInfo, + buffer: &gst::BufferRef, + timecode: i64, + ) -> Result { + if info.format() != gst_audio::AUDIO_FORMAT_S16 { + return Err(()); + } + + let map = buffer.map_readable().map_err(|_| ())?; + let src_data = map.as_slice_of::().map_err(|_| ())?; + + let src = NDIlib_audio_frame_interleaved_16s_t { + sample_rate: info.rate() as i32, + no_channels: info.channels() as i32, + no_samples: src_data.len() as i32 / info.channels() as i32, + timecode, + reference_level: 0, + p_data: src_data.as_ptr() as *mut i16, + }; + + let channel_stride_in_bytes = src.no_samples * mem::size_of::() as i32; + let mut dest_data = + Vec::with_capacity(channel_stride_in_bytes as usize * info.channels() as usize); + + let mut dest = NDIlib_audio_frame_v2_t { + sample_rate: src.sample_rate, + no_channels: src.no_channels, + no_samples: src.no_samples, + timecode: src.timecode, + p_data: dest_data.as_mut_ptr(), + channel_stride_in_bytes, + p_metadata: ptr::null(), + timestamp: 0, + }; + + unsafe { + NDIlib_util_audio_from_interleaved_16s_v2(&src, &mut dest); + dest_data.set_len(dest_data.capacity()); + } + + Ok(AudioFrame::Owned(dest, None, Some(dest_data))) + } } impl<'a> Drop for AudioFrame<'a> { #[allow(irrefutable_let_patterns)] fn drop(&mut self) { - if let AudioFrame::Borrowed(ref mut frame, ref recv) = *self { + if let AudioFrame::BorrowedRecv(ref mut frame, ref recv) = *self { unsafe { NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame); } diff --git a/src/ndisys.rs b/src/ndisys.rs index f1c9913..c5ad334 100644 --- a/src/ndisys.rs +++ b/src/ndisys.rs @@ -63,18 +63,16 @@ extern "C" { p_total: *mut NDIlib_recv_queue_t, ); pub fn NDIlib_send_create( - p_create_settings: *const NDIlib_send_create_t + p_create_settings: *const NDIlib_send_create_t, ) -> NDIlib_send_instance_t; - pub fn NDIlib_send_destroy( - p_instance: NDIlib_send_instance_t - ); + pub fn NDIlib_send_destroy(p_instance: NDIlib_send_instance_t); pub fn NDIlib_send_send_video_v2( p_instance: NDIlib_send_instance_t, - p_video_data: *const NDIlib_video_frame_v2_t + p_video_data: *const NDIlib_video_frame_v2_t, ); pub fn NDIlib_send_send_audio_v2( p_instance: NDIlib_send_instance_t, - p_audio_data: *const NDIlib_audio_frame_v2_t + p_audio_data: *const NDIlib_audio_frame_v2_t, ); } From 2345c455c142812974ed1a760bab8c92f5ff6931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 25 Feb 2021 21:00:33 +0200 Subject: [PATCH 4/4] Add initial version of NDI sink The sink can accept audio or video directly, or if both should be provided at once it is necesary to use the ndisinkcombiner before the ndisink to merge both audio and video into the same stream. Fixes https://github.com/teltek/gst-plugin-ndi/issues/10 --- Cargo.toml | 3 +- src/lib.rs | 14 +- src/ndisink.rs | 358 +++++++++++++++++++++++ src/ndisinkcombiner.rs | 637 +++++++++++++++++++++++++++++++++++++++++ src/ndisinkmeta.rs | 145 ++++++++++ 5 files changed, 1155 insertions(+), 2 deletions(-) create mode 100644 src/ndisink.rs create mode 100644 src/ndisinkcombiner.rs create mode 100644 src/ndisinkmeta.rs diff --git a/Cargo.toml b/Cargo.toml index 00f97e7..379583f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,10 @@ once_cell = "1.0" gst-plugin-version-helper = "0.2" [features] -default = ["interlaced-fields", "reference-timestamps"] +default = ["interlaced-fields", "reference-timestamps", "sink"] interlaced-fields = ["gst/v1_16", "gst-video/v1_16"] reference-timestamps = ["gst/v1_14"] +sink = ["gst/v1_18", "gst-base/v1_18"] [lib] name = "gstndi" diff --git a/src/lib.rs b/src/lib.rs index 8d201a5..5c0366b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,12 @@ use glib::prelude::*; mod device_provider; pub mod ndi; mod ndiaudiosrc; +#[cfg(feature = "sink")] +mod ndisink; +#[cfg(feature = "sink")] +mod ndisinkcombiner; +#[cfg(feature = "sink")] +pub mod ndisinkmeta; pub mod ndisys; mod ndivideosrc; pub mod receiver; @@ -35,9 +41,15 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { return Err(glib::glib_bool_error!("Cannot initialize NDI")); } + device_provider::register(plugin)?; + ndivideosrc::register(plugin)?; ndiaudiosrc::register(plugin)?; - device_provider::register(plugin)?; + #[cfg(feature = "sink")] + { + ndisinkcombiner::register(plugin)?; + ndisink::register(plugin)?; + } Ok(()) } diff --git a/src/ndisink.rs b/src/ndisink.rs new file mode 100644 index 0000000..b4f5a0b --- /dev/null +++ b/src/ndisink.rs @@ -0,0 +1,358 @@ +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_loggable_error, gst_trace}; +use gst_base::{subclass::prelude::*, BaseSinkExtManual}; + +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +use super::ndi::SendInstance; + +static DEFAULT_SENDER_NDI_NAME: Lazy = Lazy::new(|| { + format!( + "GStreamer NDI Sink {}-{}", + env!("CARGO_PKG_VERSION"), + env!("COMMIT_ID") + ) +}); + +#[derive(Debug)] +struct Settings { + ndi_name: String, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + ndi_name: DEFAULT_SENDER_NDI_NAME.clone(), + } + } +} + +static PROPERTIES: [subclass::Property; 1] = [subclass::Property("ndi-name", |name| { + glib::ParamSpec::string( + name, + "NDI Name", + "NDI Name to use", + Some(DEFAULT_SENDER_NDI_NAME.as_ref()), + glib::ParamFlags::READWRITE, + ) +})]; + +struct State { + send: SendInstance, + video_info: Option, + audio_info: Option, +} + +pub struct NdiSink { + settings: Mutex, + state: Mutex>, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink")) +}); + +impl ObjectSubclass for NdiSink { + const NAME: &'static str = "NdiSink"; + type ParentType = gst_base::BaseSink; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib::glib_object_subclass!(); + + fn new() -> Self { + Self { + settings: Mutex::new(Default::default()), + state: Mutex::new(Default::default()), + } + } + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "NDI Sink", + "Sink/Audio/Video", + "Render as an NDI stream", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::builder_full() + .structure( + gst::Structure::builder("video/x-raw") + .field( + "format", + &gst::List::new(&[ + &gst_video::VideoFormat::Uyvy.to_str(), + &gst_video::VideoFormat::I420.to_str(), + &gst_video::VideoFormat::Nv12.to_str(), + &gst_video::VideoFormat::Nv21.to_str(), + &gst_video::VideoFormat::Yv12.to_str(), + &gst_video::VideoFormat::Bgra.to_str(), + &gst_video::VideoFormat::Bgrx.to_str(), + &gst_video::VideoFormat::Rgba.to_str(), + &gst_video::VideoFormat::Rgbx.to_str(), + ]), + ) + .field("width", &gst::IntRange::::new(1, std::i32::MAX)) + .field("height", &gst::IntRange::::new(1, std::i32::MAX)) + .field( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(0, 1), + gst::Fraction::new(std::i32::MAX, 1), + ), + ) + .build(), + ) + .structure( + gst::Structure::builder("audio/x-raw") + .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) + .field("rate", &gst::IntRange::::new(1, i32::MAX)) + .field("channels", &gst::IntRange::::new(1, i32::MAX)) + .field("layout", &"interleaved") + .build(), + ) + .build(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + + klass.install_properties(&PROPERTIES); + } +} + +impl ObjectImpl for NdiSink { + glib::glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + match *prop { + subclass::Property("ndi-name", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.ndi_name = value + .get::() + .unwrap() + .unwrap_or_else(|| DEFAULT_SENDER_NDI_NAME.clone()); + } + _ => unimplemented!(), + }; + } + + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; + match *prop { + subclass::Property("ndi-name", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.ndi_name.to_value()) + } + _ => unimplemented!(), + } + } +} + +impl ElementImpl for NdiSink {} + +impl BaseSinkImpl for NdiSink { + fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + let mut state_storage = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + let send = SendInstance::builder(&settings.ndi_name) + .build() + .ok_or_else(|| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Could not create send instance"] + ) + })?; + + let state = State { + send, + video_info: None, + audio_info: None, + }; + *state_storage = Some(state); + gst_info!(CAT, obj: element, "Started"); + + Ok(()) + } + + fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + let mut state_storage = self.state.lock().unwrap(); + + *state_storage = None; + gst_info!(CAT, obj: element, "Stopped"); + + Ok(()) + } + + fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + Ok(()) + } + + fn unlock_stop(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + Ok(()) + } + + fn set_caps( + &self, + element: &gst_base::BaseSink, + caps: &gst::Caps, + ) -> Result<(), gst::LoggableError> { + gst_debug!(CAT, obj: element, "Setting caps {}", caps); + + let mut state_storage = self.state.lock().unwrap(); + let state = match &mut *state_storage { + None => return Err(gst_loggable_error!(CAT, "Sink not started yet")), + Some(ref mut state) => state, + }; + + let s = caps.get_structure(0).unwrap(); + if s.get_name() == "video/x-raw" { + let info = gst_video::VideoInfo::from_caps(caps) + .map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; + + state.video_info = Some(info); + state.audio_info = None; + } else { + let info = gst_audio::AudioInfo::from_caps(caps) + .map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; + + state.audio_info = Some(info); + state.video_info = None; + } + + Ok(()) + } + + fn render( + &self, + element: &gst_base::BaseSink, + buffer: &gst::Buffer, + ) -> Result { + let mut state_storage = self.state.lock().unwrap(); + let state = match &mut *state_storage { + None => return Err(gst::FlowError::Error), + Some(ref mut state) => state, + }; + + if let Some(ref info) = state.video_info { + if let Some(audio_meta) = buffer.get_meta::() { + for (buffer, info, timecode) in audio_meta.buffers() { + let frame = + crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode) + .map_err(|_| { + gst_error!(CAT, obj: element, "Unsupported audio frame"); + gst::FlowError::NotNegotiated + })?; + + gst_trace!( + CAT, + obj: element, + "Sending audio buffer {:?} with timecode {} and format {:?}", + buffer, + if *timecode < 0 { + gst::CLOCK_TIME_NONE + } else { + gst::ClockTime::from(*timecode as u64 * 100) + }, + info, + ); + state.send.send_audio(&frame); + } + } + + // Skip empty/gap buffers from ndisinkcombiner + if buffer.get_size() != 0 { + let timecode = element + .get_segment() + .downcast::() + .ok() + .and_then(|segment| { + *(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) + }) + .map(|time| (time / 100) as i64) + .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); + + let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info) + .map_err(|_| { + gst_error!(CAT, obj: element, "Failed to map buffer"); + gst::FlowError::Error + })?; + + let frame = crate::ndi::VideoFrame::try_from_video_frame(&frame, timecode) + .map_err(|_| { + gst_error!(CAT, obj: element, "Unsupported video frame"); + gst::FlowError::NotNegotiated + })?; + + gst_trace!( + CAT, + obj: element, + "Sending video buffer {:?} with timecode {} and format {:?}", + buffer, + if timecode < 0 { + gst::CLOCK_TIME_NONE + } else { + gst::ClockTime::from(timecode as u64 * 100) + }, + info + ); + state.send.send_video(&frame); + } + } else if let Some(ref info) = state.audio_info { + let timecode = element + .get_segment() + .downcast::() + .ok() + .and_then(|segment| { + *(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) + }) + .map(|time| (time / 100) as i64) + .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); + + let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode) + .map_err(|_| { + gst_error!(CAT, obj: element, "Unsupported audio frame"); + gst::FlowError::NotNegotiated + })?; + + gst_trace!( + CAT, + obj: element, + "Sending audio buffer {:?} with timecode {} and format {:?}", + buffer, + if timecode < 0 { + gst::CLOCK_TIME_NONE + } else { + gst::ClockTime::from(timecode as u64 * 100) + }, + info, + ); + state.send.send_audio(&frame); + } else { + return Err(gst::FlowError::Error); + } + + Ok(gst::FlowSuccess::Ok) + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndisink", + gst::Rank::None, + NdiSink::get_type(), + ) +} diff --git a/src/ndisinkcombiner.rs b/src/ndisinkcombiner.rs new file mode 100644 index 0000000..ddc9179 --- /dev/null +++ b/src/ndisinkcombiner.rs @@ -0,0 +1,637 @@ +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error, gst_trace, gst_warning}; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; + +use std::mem; +use std::sync::Mutex; + +static CAT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + gst::DebugCategory::new( + "ndisinkcombiner", + gst::DebugColorFlags::empty(), + Some("NDI sink audio/video combiner"), + ) +}); + +struct State { + // Note that this applies to the currently pending buffer on the pad and *not* + // to the current_video_buffer below! + video_info: Option, + audio_info: Option, + current_video_buffer: Option<(gst::Buffer, gst::ClockTime)>, + current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, +} + +struct NdiSinkCombiner { + video_pad: gst_base::AggregatorPad, + audio_pad: Mutex>, + state: Mutex>, +} + +impl ObjectSubclass for NdiSinkCombiner { + const NAME: &'static str = "NdiSinkCombiner"; + type ParentType = gst_base::Aggregator; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib::glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "NDI Sink Combiner", + "Combiner/Audio/Video", + "NDI sink audio/video combiner", + "Sebastian Dröge ", + ); + + let caps = gst::Caps::builder("video/x-raw") + .field( + "format", + &gst::List::new(&[ + &gst_video::VideoFormat::Uyvy.to_str(), + &gst_video::VideoFormat::I420.to_str(), + &gst_video::VideoFormat::Nv12.to_str(), + &gst_video::VideoFormat::Nv21.to_str(), + &gst_video::VideoFormat::Yv12.to_str(), + &gst_video::VideoFormat::Bgra.to_str(), + &gst_video::VideoFormat::Bgrx.to_str(), + &gst_video::VideoFormat::Rgba.to_str(), + &gst_video::VideoFormat::Rgbx.to_str(), + ]), + ) + .field("width", &gst::IntRange::::new(1, i32::MAX)) + .field("height", &gst::IntRange::::new(1, i32::MAX)) + .field( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(1, i32::MAX), + gst::Fraction::new(i32::MAX, 1), + ), + ) + .build(); + let src_pad_template = gst::PadTemplate::with_gtype( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + let sink_pad_template = gst::PadTemplate::with_gtype( + "video", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + + let caps = gst::Caps::builder("audio/x-raw") + .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) + .field("rate", &gst::IntRange::::new(1, i32::MAX)) + .field("channels", &gst::IntRange::::new(1, i32::MAX)) + .field("layout", &"interleaved") + .build(); + let sink_pad_template = gst::PadTemplate::with_gtype( + "audio", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + } + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.get_pad_template("video").unwrap(); + let video_pad = + gst::PadBuilder::::from_template(&templ, Some("video")) + .build(); + + Self { + video_pad, + audio_pad: Mutex::new(None), + state: Mutex::new(None), + } + } +} + +impl ObjectImpl for NdiSinkCombiner { + glib::glib_object_impl!(); + + fn constructed(&self, obj: &glib::Object) { + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.video_pad).unwrap(); + + self.parent_constructed(obj); + } +} + +impl ElementImpl for NdiSinkCombiner { + fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { + let mut audio_pad_storage = self.audio_pad.lock().unwrap(); + + if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) { + gst_debug!(CAT, obj: element, "Release audio pad"); + self.parent_release_pad(element, pad); + *audio_pad_storage = None; + } + } +} + +impl AggregatorImpl for NdiSinkCombiner { + fn create_new_pad( + &self, + agg: &gst_base::Aggregator, + templ: &gst::PadTemplate, + _req_name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let mut audio_pad_storage = self.audio_pad.lock().unwrap(); + + if audio_pad_storage.is_some() { + gst_error!(CAT, obj: agg, "Audio pad already requested"); + return None; + } + + let sink_templ = agg.get_pad_template("audio").unwrap(); + if templ != &sink_templ { + gst_error!(CAT, obj: agg, "Wrong pad template"); + return None; + } + + let pad = + gst::PadBuilder::::from_template(templ, Some("audio")).build(); + *audio_pad_storage = Some(pad.clone()); + + gst_debug!(CAT, obj: agg, "Requested audio pad"); + + Some(pad) + } + + fn start(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + let mut state_storage = self.state.lock().unwrap(); + *state_storage = Some(State { + audio_info: None, + video_info: None, + current_video_buffer: None, + current_audio_buffers: Vec::new(), + }); + + gst_debug!(CAT, obj: agg, "Started"); + + Ok(()) + } + + fn stop(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + // Drop our state now + let _ = self.state.lock().unwrap().take(); + + gst_debug!(CAT, obj: agg, "Stopped"); + + Ok(()) + } + + fn get_next_time(&self, _agg: &gst_base::Aggregator) -> gst::ClockTime { + // FIXME: What to do here? We don't really know when the next buffer is expected + gst::CLOCK_TIME_NONE + } + + fn clip( + &self, + agg: &gst_base::Aggregator, + agg_pad: &gst_base::AggregatorPad, + mut buffer: gst::Buffer, + ) -> Option { + let segment = match agg_pad.get_segment().downcast::() { + Ok(segment) => segment, + Err(_) => { + gst_error!(CAT, obj: agg, "Only TIME segments supported"); + return Some(buffer); + } + }; + + let pts = buffer.get_pts(); + if pts.is_none() { + gst_error!(CAT, obj: agg, "Only buffers with PTS supported"); + return Some(buffer); + } + + let duration = if buffer.get_duration().is_some() { + buffer.get_duration() + } else { + gst::CLOCK_TIME_NONE + }; + + gst_trace!( + CAT, + obj: agg_pad, + "Clipping buffer {:?} with PTS {} and duration {}", + buffer, + pts, + duration + ); + + let state_storage = self.state.lock().unwrap(); + let state = match &*state_storage { + Some(ref state) => state, + None => return None, + }; + + let duration = if buffer.get_duration().is_some() { + buffer.get_duration() + } else if let Some(ref audio_info) = state.audio_info { + gst::SECOND + .mul_div_floor( + buffer.get_size() as u64, + audio_info.rate() as u64 * audio_info.bpf() as u64, + ) + .unwrap() + } else if let Some(ref video_info) = state.video_info { + if *video_info.fps().numer() > 0 { + gst::SECOND + .mul_div_floor( + *video_info.fps().denom() as u64, + *video_info.fps().numer() as u64, + ) + .unwrap() + } else { + gst::CLOCK_TIME_NONE + } + } else { + unreachable!() + }; + + gst_debug!( + CAT, + obj: agg_pad, + "Clipping buffer {:?} with PTS {} and duration {}", + buffer, + pts, + duration + ); + + if agg_pad == &self.video_pad { + segment.clip(pts, pts + duration).map(|(start, stop)| { + { + let buffer = buffer.make_mut(); + buffer.set_pts(start); + if duration.is_some() { + buffer.set_duration(stop - start); + } + } + + buffer + }) + } else if let Some(ref audio_info) = state.audio_info { + gst_audio::audio_buffer_clip( + buffer, + segment.upcast_ref(), + audio_info.rate(), + audio_info.bpf(), + ) + } else { + // Can't really have audio buffers without caps + unreachable!(); + } + } + + fn aggregate( + &self, + agg: &gst_base::Aggregator, + timeout: bool, + ) -> Result { + // FIXME: Can't really happen because we always return NONE from get_next_time() but that + // should be improved! + assert!(!timeout); + + // Because peek_buffer() can call into clip() and that would take the state lock again, + // first try getting buffers from both pads here + let video_buffer_and_segment = match self.video_pad.peek_buffer() { + Some(video_buffer) => { + let video_segment = self.video_pad.get_segment(); + let video_segment = match video_segment.downcast::() { + Ok(video_segment) => video_segment, + Err(video_segment) => { + gst_error!( + CAT, + obj: agg, + "Video segment of wrong format {:?}", + video_segment.get_format() + ); + return Err(gst::FlowError::Error); + } + }; + + Some((video_buffer, video_segment)) + } + None if !self.video_pad.is_eos() => { + gst_trace!(CAT, obj: agg, "Waiting for video buffer"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + None => None, + }; + + let audio_buffer_segment_and_pad; + if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() { + audio_buffer_segment_and_pad = match audio_pad.peek_buffer() { + Some(audio_buffer) if audio_buffer.get_size() == 0 => { + // Skip empty/gap audio buffer + audio_pad.drop_buffer(); + gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + Some(audio_buffer) => { + let audio_segment = audio_pad.get_segment(); + let audio_segment = match audio_segment.downcast::() { + Ok(audio_segment) => audio_segment, + Err(audio_segment) => { + gst_error!( + CAT, + obj: agg, + "Audio segment of wrong format {:?}", + audio_segment.get_format() + ); + return Err(gst::FlowError::Error); + } + }; + + Some((audio_buffer, audio_segment, audio_pad)) + } + None if !audio_pad.is_eos() => { + gst_trace!(CAT, obj: agg, "Waiting for audio buffer"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + None => None, + }; + } else { + audio_buffer_segment_and_pad = None; + } + + let mut state_storage = self.state.lock().unwrap(); + let state = match &mut *state_storage { + Some(ref mut state) => state, + None => return Err(gst::FlowError::Flushing), + }; + + let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) = + if let Some((video_buffer, video_segment)) = video_buffer_and_segment { + let video_running_time = video_segment.to_running_time(video_buffer.get_pts()); + assert!(video_running_time.is_some()); + + match state.current_video_buffer { + None => { + gst_trace!(CAT, obj: agg, "First video buffer, waiting for second"); + state.current_video_buffer = Some((video_buffer, video_running_time)); + drop(state_storage); + self.video_pad.drop_buffer(); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + Some((ref buffer, _)) => ( + buffer.clone(), + video_running_time, + Some((video_buffer, video_running_time)), + ), + } + } else { + match (&state.current_video_buffer, &audio_buffer_segment_and_pad) { + (None, None) => { + gst_trace!( + CAT, + obj: agg, + "All pads are EOS and no buffers are queued, finishing" + ); + return Err(gst::FlowError::Eos); + } + (None, Some((ref audio_buffer, ref audio_segment, _))) => { + // Create an empty dummy buffer for attaching the audio. This is going to + // be dropped by the sink later. + let audio_running_time = + audio_segment.to_running_time(audio_buffer.get_pts()); + assert!(audio_running_time.is_some()); + + let video_segment = self.video_pad.get_segment(); + let video_segment = match video_segment.downcast::() { + Ok(video_segment) => video_segment, + Err(video_segment) => { + gst_error!( + CAT, + obj: agg, + "Video segment of wrong format {:?}", + video_segment.get_format() + ); + return Err(gst::FlowError::Error); + } + }; + let video_pts = + video_segment.position_from_running_time(audio_running_time); + if video_pts.is_none() { + gst_warning!(CAT, obj: agg, "Can't output more audio after video EOS"); + return Err(gst::FlowError::Eos); + } + + let mut buffer = gst::Buffer::new(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(video_pts); + } + + (buffer, gst::CLOCK_TIME_NONE, None) + } + (Some((ref buffer, _)), _) => (buffer.clone(), gst::CLOCK_TIME_NONE, None), + } + }; + + if let Some((audio_buffer, audio_segment, audio_pad)) = audio_buffer_segment_and_pad { + let audio_info = match state.audio_info { + Some(ref audio_info) => audio_info, + None => { + gst_error!(CAT, obj: agg, "Have no audio caps"); + return Err(gst::FlowError::NotNegotiated); + } + }; + + let audio_running_time = audio_segment.to_running_time(audio_buffer.get_pts()); + assert!(audio_running_time.is_some()); + let duration = gst::SECOND + .mul_div_floor( + audio_buffer.get_size() as u64 / audio_info.bpf() as u64, + audio_info.rate() as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + let audio_running_time_end = audio_running_time + duration; + assert!(audio_running_time_end.is_some()); + + if audio_running_time_end <= current_video_running_time_end + || current_video_running_time_end.is_none() + { + let timecode = (audio_running_time + agg.get_base_time()) + .map(|t| (t / 100) as i64) + .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); + + gst_trace!( + CAT, + obj: agg, + "Including audio buffer {:?} with timecode {}: {} <= {}", + audio_buffer, + timecode, + audio_running_time_end, + current_video_running_time_end, + ); + state + .current_audio_buffers + .push((audio_buffer, audio_info.clone(), timecode)); + audio_pad.drop_buffer(); + + // If there is still video data, wait for the next audio buffer or EOS, + // otherwise just output the dummy video buffer directly. + if current_video_running_time_end.is_some() { + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + } + + // Otherwise finish this video buffer with all audio that has accumulated so + // far + } + + let audio_buffers = mem::replace(&mut state.current_audio_buffers, Vec::new()); + + if !audio_buffers.is_empty() { + let current_video_buffer = current_video_buffer.make_mut(); + crate::ndisinkmeta::NdiSinkAudioMeta::add(current_video_buffer, audio_buffers); + } + + if let Some((video_buffer, video_running_time)) = next_video_buffer { + state.current_video_buffer = Some((video_buffer, video_running_time)); + drop(state_storage); + self.video_pad.drop_buffer(); + } else { + state.current_video_buffer = None; + drop(state_storage); + } + + gst_trace!( + CAT, + obj: agg, + "Finishing video buffer {:?}", + current_video_buffer + ); + agg.finish_buffer(current_video_buffer) + } + + fn sink_event( + &self, + agg: &gst_base::Aggregator, + pad: &gst_base::AggregatorPad, + event: gst::Event, + ) -> bool { + use gst::EventView; + + match event.view() { + EventView::Caps(caps) => { + let caps = caps.get_caps_owned(); + + let mut state_storage = self.state.lock().unwrap(); + let state = match &mut *state_storage { + Some(ref mut state) => state, + None => return false, + }; + + if pad == &self.video_pad { + let info = match gst_video::VideoInfo::from_caps(&caps) { + Ok(info) => info, + Err(_) => { + gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); + return false; + } + }; + + // 2 frames latency because we queue 1 frame and wait until audio + // up to the end of that frame has arrived. + let latency = if *info.fps().numer() > 0 { + gst::SECOND + .mul_div_floor( + 2 * *info.fps().denom() as u64, + *info.fps().numer() as u64, + ) + .unwrap_or(80 * gst::MSECOND) + } else { + // let's assume 25fps and 2 frames latency + 80 * gst::MSECOND + }; + + state.video_info = Some(info); + + drop(state_storage); + + agg.set_latency(latency, gst::CLOCK_TIME_NONE); + + // The video caps are passed through as the audio is included only in a meta + agg.set_src_caps(&caps); + } else { + let info = match gst_audio::AudioInfo::from_caps(&caps) { + Ok(info) => info, + Err(_) => { + gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); + return false; + } + }; + + state.audio_info = Some(info); + } + } + // The video segment is passed through as-is and the video timestamps are preserved + EventView::Segment(segment) if pad == &self.video_pad => { + let segment = segment.get_segment(); + gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment); + agg.update_segment(segment); + } + _ => (), + } + + self.parent_sink_event(agg, pad, event) + } + + fn sink_query( + &self, + agg: &gst_base::Aggregator, + pad: &gst_base::AggregatorPad, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + match query.view_mut() { + QueryView::Caps(_) if pad == &self.video_pad => { + // Directly forward caps queries + let srcpad = agg.get_static_pad("src").unwrap(); + return srcpad.peer_query(query); + } + _ => (), + } + + self.parent_sink_query(agg, pad, query) + } + + fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool { + // No negotiation needed as the video caps are just passed through + true + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndisinkcombiner", + gst::Rank::None, + NdiSinkCombiner::get_type(), + ) +} diff --git a/src/ndisinkmeta.rs b/src/ndisinkmeta.rs new file mode 100644 index 0000000..f53449e --- /dev/null +++ b/src/ndisinkmeta.rs @@ -0,0 +1,145 @@ +use gst::gst_sys; +use gst::prelude::*; +use std::fmt; +use std::mem; + +#[repr(transparent)] +pub struct NdiSinkAudioMeta(imp::NdiSinkAudioMeta); + +unsafe impl Send for NdiSinkAudioMeta {} +unsafe impl Sync for NdiSinkAudioMeta {} + +impl NdiSinkAudioMeta { + pub fn add( + buffer: &mut gst::BufferRef, + buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, + ) -> gst::MetaRefMut { + unsafe { + // Manually dropping because gst_buffer_add_meta() takes ownership of the + // content of the struct + let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers }); + + let meta = gst_sys::gst_buffer_add_meta( + buffer.as_mut_ptr(), + imp::ndi_sink_audio_meta_get_info(), + &mut *params as *mut imp::NdiSinkAudioMetaParams as glib::glib_sys::gpointer, + ) as *mut imp::NdiSinkAudioMeta; + + Self::from_mut_ptr(buffer, meta) + } + } + + pub fn buffers(&self) -> &[(gst::Buffer, gst_audio::AudioInfo, i64)] { + &self.0.buffers + } +} + +unsafe impl MetaAPI for NdiSinkAudioMeta { + type GstType = imp::NdiSinkAudioMeta; + + fn get_meta_api() -> glib::Type { + imp::ndi_sink_audio_meta_api_get_type() + } +} + +impl fmt::Debug for NdiSinkAudioMeta { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("NdiSinkAudioMeta") + .field("buffers", &self.buffers()) + .finish() + } +} + +mod imp { + use glib::glib_sys; + use glib::translate::*; + use gst::gst_sys; + use once_cell::sync::Lazy; + use std::mem; + use std::ptr; + + pub(super) struct NdiSinkAudioMetaParams { + pub buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, + } + + #[repr(C)] + pub struct NdiSinkAudioMeta { + parent: gst_sys::GstMeta, + pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, + } + + pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type { + static TYPE: Lazy = Lazy::new(|| unsafe { + let t = from_glib(gst_sys::gst_meta_api_type_register( + b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _, + [ptr::null::()].as_ptr() as *mut *const _, + )); + + assert_ne!(t, glib::Type::Invalid); + + t + }); + + *TYPE + } + + unsafe extern "C" fn ndi_sink_audio_meta_init( + meta: *mut gst_sys::GstMeta, + params: glib_sys::gpointer, + _buffer: *mut gst_sys::GstBuffer, + ) -> glib_sys::gboolean { + assert!(!params.is_null()); + + let meta = &mut *(meta as *mut NdiSinkAudioMeta); + let params = ptr::read(params as *const NdiSinkAudioMetaParams); + + ptr::write(&mut meta.buffers, params.buffers); + + true.to_glib() + } + + unsafe extern "C" fn ndi_sink_audio_meta_free( + meta: *mut gst_sys::GstMeta, + _buffer: *mut gst_sys::GstBuffer, + ) { + let meta = &mut *(meta as *mut NdiSinkAudioMeta); + + ptr::drop_in_place(&mut meta.buffers); + } + + unsafe extern "C" fn ndi_sink_audio_meta_transform( + dest: *mut gst_sys::GstBuffer, + meta: *mut gst_sys::GstMeta, + _buffer: *mut gst_sys::GstBuffer, + _type_: glib_sys::GQuark, + _data: glib_sys::gpointer, + ) -> glib_sys::gboolean { + let meta = &*(meta as *mut NdiSinkAudioMeta); + + super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone()); + + true.to_glib() + } + + pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst_sys::GstMetaInfo { + struct MetaInfo(ptr::NonNull); + unsafe impl Send for MetaInfo {} + unsafe impl Sync for MetaInfo {} + + static META_INFO: Lazy = Lazy::new(|| unsafe { + MetaInfo( + ptr::NonNull::new(gst_sys::gst_meta_register( + ndi_sink_audio_meta_api_get_type().to_glib(), + b"GstNdiSinkAudioMeta\0".as_ptr() as *const _, + mem::size_of::(), + Some(ndi_sink_audio_meta_init), + Some(ndi_sink_audio_meta_free), + Some(ndi_sink_audio_meta_transform), + ) as *mut gst_sys::GstMetaInfo) + .expect("Failed to register meta API"), + ) + }); + + META_INFO.0.as_ptr() + } +}