From 9b67d2e0e3da5090ea5fa91a25042996f1a9e744 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Fri, 18 Mar 2022 16:35:29 +0900 Subject: [PATCH 1/7] platform: Conditional impl_platform_host based on feature flags --- src/platform/mod.rs | 54 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/src/platform/mod.rs b/src/platform/mod.rs index d75e9005e..7c0e02083 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -31,10 +31,11 @@ pub use self::platform_impl::*; // SupportedOutputConfigs and all their necessary trait implementations. // ``` macro_rules! impl_platform_host { - ($($HostVariant:ident $host_mod:ident $host_name:literal),*) => { + ($($(#[cfg($feat: meta)])? $HostVariant:ident $host_mod:ident $host_name:literal),*) => { /// All hosts supported by CPAL on this platform. pub const ALL_HOSTS: &'static [HostId] = &[ $( + $(#[cfg($feat)])? HostId::$HostVariant, )* ]; @@ -79,42 +80,49 @@ macro_rules! impl_platform_host { #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub enum HostId { $( + $(#[cfg($feat)])? $HostVariant, )* } enum DeviceInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::Device), )* } enum DevicesInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::Devices), )* } enum HostInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::Host), )* } enum StreamInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::Stream), )* } enum SupportedInputConfigsInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::SupportedInputConfigs), )* } enum SupportedOutputConfigsInner { $( + $(#[cfg($feat)])? $HostVariant(crate::host::$host_mod::SupportedOutputConfigs), )* } @@ -123,6 +131,7 @@ macro_rules! impl_platform_host { pub fn name(&self) -> &'static str { match self { $( + $(#[cfg($feat)])? HostId::$HostVariant => $host_name, )* } @@ -134,6 +143,7 @@ macro_rules! impl_platform_host { pub fn id(&self) -> HostId { match self.0 { $( + $(#[cfg($feat)])? HostInner::$HostVariant(_) => HostId::$HostVariant, )* } @@ -146,6 +156,7 @@ macro_rules! impl_platform_host { fn next(&mut self) -> Option { match self.0 { $( + $(#[cfg($feat)])? DevicesInner::$HostVariant(ref mut d) => { d.next().map(DeviceInner::$HostVariant).map(Device::from) } @@ -156,6 +167,7 @@ macro_rules! impl_platform_host { fn size_hint(&self) -> (usize, Option) { match self.0 { $( + $(#[cfg($feat)])? DevicesInner::$HostVariant(ref d) => d.size_hint(), )* } @@ -168,6 +180,7 @@ macro_rules! impl_platform_host { fn next(&mut self) -> Option { match self.0 { $( + $(#[cfg($feat)])? SupportedInputConfigsInner::$HostVariant(ref mut s) => s.next(), )* } @@ -176,6 +189,7 @@ macro_rules! impl_platform_host { fn size_hint(&self) -> (usize, Option) { match self.0 { $( + $(#[cfg($feat)])? SupportedInputConfigsInner::$HostVariant(ref d) => d.size_hint(), )* } @@ -188,6 +202,7 @@ macro_rules! impl_platform_host { fn next(&mut self) -> Option { match self.0 { $( + $(#[cfg($feat)])? SupportedOutputConfigsInner::$HostVariant(ref mut s) => s.next(), )* } @@ -196,6 +211,7 @@ macro_rules! impl_platform_host { fn size_hint(&self) -> (usize, Option) { match self.0 { $( + $(#[cfg($feat)])? SupportedOutputConfigsInner::$HostVariant(ref d) => d.size_hint(), )* } @@ -210,6 +226,7 @@ macro_rules! impl_platform_host { fn name(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => d.name(), )* } @@ -218,6 +235,7 @@ macro_rules! impl_platform_host { fn supported_input_configs(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => { d.supported_input_configs() .map(SupportedInputConfigsInner::$HostVariant) @@ -230,6 +248,7 @@ macro_rules! impl_platform_host { fn supported_output_configs(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => { d.supported_output_configs() .map(SupportedOutputConfigsInner::$HostVariant) @@ -242,6 +261,7 @@ macro_rules! impl_platform_host { fn default_input_config(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => d.default_input_config(), )* } @@ -250,6 +270,7 @@ macro_rules! impl_platform_host { fn default_output_config(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => d.default_output_config(), )* } @@ -268,6 +289,7 @@ macro_rules! impl_platform_host { { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => d .build_input_stream_raw( config, @@ -294,6 +316,7 @@ macro_rules! impl_platform_host { { match self.0 { $( + $(#[cfg($feat)])? DeviceInner::$HostVariant(ref d) => d .build_output_stream_raw( config, @@ -313,12 +336,17 @@ macro_rules! impl_platform_host { type Device = Device; fn is_available() -> bool { - $( crate::host::$host_mod::Host::is_available() ||)* false + $( + $(#[cfg($feat)])? + if crate::host::$host_mod::Host::is_available() { return true; } + )* + false } fn devices(&self) -> Result { match self.0 { $( + $(#[cfg($feat)])? HostInner::$HostVariant(ref h) => { h.devices().map(DevicesInner::$HostVariant).map(Devices::from) } @@ -329,6 +357,7 @@ macro_rules! impl_platform_host { fn default_input_device(&self) -> Option { match self.0 { $( + $(#[cfg($feat)])? HostInner::$HostVariant(ref h) => { h.default_input_device().map(DeviceInner::$HostVariant).map(Device::from) } @@ -339,6 +368,7 @@ macro_rules! impl_platform_host { fn default_output_device(&self) -> Option { match self.0 { $( + $(#[cfg($feat)])? HostInner::$HostVariant(ref h) => { h.default_output_device().map(DeviceInner::$HostVariant).map(Device::from) } @@ -351,6 +381,7 @@ macro_rules! impl_platform_host { fn play(&self) -> Result<(), crate::PlayStreamError> { match self.0 { $( + $(#[cfg($feat)])? StreamInner::$HostVariant(ref s) => { s.play() } @@ -361,6 +392,7 @@ macro_rules! impl_platform_host { fn pause(&self) -> Result<(), crate::PauseStreamError> { match self.0 { $( + $(#[cfg($feat)])? StreamInner::$HostVariant(ref s) => { s.pause() } @@ -394,24 +426,28 @@ macro_rules! impl_platform_host { } $( + $(#[cfg($feat)])? impl From for Device { fn from(h: crate::host::$host_mod::Device) -> Self { DeviceInner::$HostVariant(h).into() } } + $(#[cfg($feat)])? impl From for Devices { fn from(h: crate::host::$host_mod::Devices) -> Self { DevicesInner::$HostVariant(h).into() } } + $(#[cfg($feat)])? impl From for Host { fn from(h: crate::host::$host_mod::Host) -> Self { HostInner::$HostVariant(h).into() } } + $(#[cfg($feat)])? impl From for Stream { fn from(h: crate::host::$host_mod::Stream) -> Self { StreamInner::$HostVariant(h).into() @@ -423,6 +459,7 @@ macro_rules! impl_platform_host { pub fn available_hosts() -> Vec { let mut host_ids = vec![]; $( + $(#[cfg($feat)])? if ::is_available() { host_ids.push(HostId::$HostVariant); } @@ -434,6 +471,7 @@ macro_rules! impl_platform_host { pub fn host_from_id(id: HostId) -> Result { match id { $( + $(#[cfg($feat)])? HostId::$HostVariant => { crate::host::$host_mod::Host::new() .map(HostInner::$HostVariant) @@ -460,11 +498,7 @@ mod platform_impl { SupportedOutputConfigs as JackSupportedOutputConfigs, }; - #[cfg(feature = "jack")] - impl_platform_host!(Jack jack "JACK", Alsa alsa "ALSA"); - - #[cfg(not(feature = "jack"))] - impl_platform_host!(Alsa alsa "ALSA"); + impl_platform_host!(#[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); /// The default host for the current compilation target platform. pub fn default_host() -> Host { @@ -542,11 +576,7 @@ mod platform_impl { SupportedOutputConfigs as WasapiSupportedOutputConfigs, }; - #[cfg(feature = "asio")] - impl_platform_host!(Asio asio "ASIO", Wasapi wasapi "WASAPI"); - - #[cfg(not(feature = "asio"))] - impl_platform_host!(Wasapi wasapi "WASAPI"); + impl_platform_host!(#[cfg(feature = "asio")] Asio asio "ASIO", Wasapi wasapi "WASAPI"); /// The default host for the current compilation target platform. pub fn default_host() -> Host { From 8f0ca29a44af6231586c5eab1bd70a11317061e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Sat, 5 Mar 2022 18:09:41 +0100 Subject: [PATCH 2/7] Pipewire host & device --- Cargo.toml | 2 + src/host/mod.rs | 5 + src/host/pipewire/conn.rs | 140 +++++++++++++++++++++ src/host/pipewire/device.rs | 237 ++++++++++++++++++++++++++++++++++++ src/host/pipewire/mod.rs | 123 +++++++++++++++++++ src/platform/mod.rs | 8 +- 6 files changed, 514 insertions(+), 1 deletion(-) create mode 100644 src/host/pipewire/conn.rs create mode 100644 src/host/pipewire/device.rs create mode 100644 src/host/pipewire/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 15898dbda..696ff10db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ nix = "0.23" libc = "0.2.65" parking_lot = "0.12" jack = { version = "0.9", optional = true } +# pipewire = { version = "0.4", optional = true } +pipewire = { git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] core-foundation-sys = "0.8.2" # For linking to CoreFoundation.framework and handling device name `CFString`s. diff --git a/src/host/mod.rs b/src/host/mod.rs index 555b01022..bae7309a9 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -11,6 +11,11 @@ pub(crate) mod emscripten; feature = "jack" ))] pub(crate) mod jack; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; pub(crate) mod null; #[cfg(target_os = "android")] pub(crate) mod oboe; diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs new file mode 100644 index 000000000..fe4e0e3f8 --- /dev/null +++ b/src/host/pipewire/conn.rs @@ -0,0 +1,140 @@ +extern crate pipewire; + +use self::pipewire::{ + metadata::Metadata, + prelude::*, + registry::{GlobalObject, Registry}, + spa::ForeignDict, + types::ObjectType, +}; + +use std::{ + cell::{Cell, RefCell}, + rc::Rc, + sync::mpsc, + thread, +}; + +enum Message { + Terminate, + GetSettings, +} + +enum MessageRepl { + Settings(Settings), +} + +pub struct PWClient { + pw_sender: pipewire::channel::Sender, + main_receiver: mpsc::Receiver, +} + +impl PWClient { + pub fn new() -> Self { + let (main_sender, main_receiver) = mpsc::channel(); + let (pw_sender, pw_receiver) = pipewire::channel::channel(); + + let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + + Self { + pw_sender, + main_receiver, + } + } + + pub fn get_settings(&self) -> Settings { + self.pw_sender.send(Message::GetSettings); + + if let MessageRepl::Settings(settings) = self.main_receiver.recv().expect("Reply") { + settings + } else { + Settings::default() + } + } +} + +#[derive(Default)] +struct State { + settings: Settings, +} + +#[derive(Default, Clone)] +struct Settings { + pub sample_rate: u32, + pub min_buffer_size: u32, + pub max_buffer_size: u32, + pub default_buffer_size: u32, +} + +fn pw_thread( + main_sender: mpsc::Sender, + pw_receiver: pipewire::channel::Receiver, +) { + let state = Rc::new(State::default()); + // let state = Rc::new(RefCell::new(State::default())); + + let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); + + let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); + let core = context + .connect(None) + .expect("Failed to connect to PipeWire"); + let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); + + let _receiver = pw_receiver.attach(&mainloop, |msg| { + let mainloop = mainloop.clone(); + + match msg { + Message::Terminate => mainloop.quit(), + Message::GetSettings => { + main_sender.send(MessageRepl::Settings(state.settings.clone())); + } + } + }); + + let state_clone = state.clone(); + let _listener = registry + .add_listener_local() + .global(|global| match global.type_ { + ObjectType::Metadata => handle_metadata(global, state_clone, ®istry), + _ => {} + }); + + mainloop.run(); +} + +fn handle_metadata( + metadata: &GlobalObject, + state: Rc, + registry: &Rc, +) { + let props = metadata + .props + .as_ref() + .expect("Metadata object is missing properties"); + + match props.get("metadata.name") { + Some("settings") => { + let settings: Metadata = registry.bind(metadata).expect("Metadata"); + + settings + .add_listener_local() + .property(|_, key, _, value| { + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.quantum") => state.settings.default_buffer_size = value, + Some("clock.min-quantum") => state.settings.min_buffer_size = value, + Some("clock.max-quantum") => state.settings.max_buffer_size = value, + None => {} + }; + } + } + 0 + }) + .register(); + } + None => {} + }; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..1c5e004f6 --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,237 @@ +use crate::{ + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, + InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, StreamError, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, +}; +use std::hash::{Hash, Hasher}; +use std::rc::Rc; +use traits::DeviceTrait; + +use super::stream::Stream; +use super::PIPEWIRE_SAMPLE_FORMAT; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; + +const DEFAULT_NUM_CHANNELS: u16 = 2; +const DEFAULT_SUPPORTED_CHANNELS: [u16; 10] = [1, 2, 4, 6, 8, 16, 24, 32, 48, 64]; + +/// If a device is for input or output. +/// Until we have duplex stream support PipeWire nodes and CPAL devices for PipeWire will be either input or output. +#[derive(Clone, Debug)] +pub enum DeviceType { + InputDevice, + OutputDevice, +} +#[derive(Clone)] +pub struct Device { + name: String, + sample_rate: SampleRate, + buffer_size: SupportedBufferSize, + device_type: DeviceType, + connect_ports_automatically: bool, + client: Rc +} + +impl Device { + fn new_device( + name: String, + connect_ports_automatically: bool, + device_type: DeviceType, + client: super::conn::PWClient, + ) -> Result { + let settings = client.get_settings(); + + Ok(Device { + name: "cpal_client".to_string(), + sample_rate: SampleRate(settings.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: settings.min_buffer_size, + max: settings.max_buffer_size, + }, + device_type, + connect_ports_automatically, + client: Rc::new(client) + }) + } + + pub fn default_output_device( + name: &str, + connect_ports_automatically: bool, + client: super::conn::PWClient, + ) -> Result { + let output_client_name = format!("{}_out", name); + Device::new_device( + output_client_name, + connect_ports_automatically, + DeviceType::OutputDevice, + client, + ) + } + + pub fn default_input_device( + name: &str, + connect_ports_automatically: bool, + client: super::conn::PWClient, + ) -> Result { + let input_client_name = format!("{}_in", name); + Device::new_device( + input_client_name, + connect_ports_automatically, + DeviceType::InputDevice, + client, + ) + } + + pub fn default_config(&self) -> Result { + let channels = DEFAULT_NUM_CHANNELS; + let sample_rate = self.sample_rate; + let buffer_size = self.buffer_size.clone(); + // The sample format for JACK audio ports is always "32-bit float mono audio" in the current implementation. + // Custom formats are allowed within JACK, but this is of niche interest. + // The format can be found programmatically by calling jack::PortSpec::port_type() on a created port. + let sample_format = PIPEWIRE_SAMPLE_FORMAT; + Ok(SupportedStreamConfig { + channels, + sample_rate, + buffer_size, + sample_format, + }) + } + + pub fn supported_configs(&self) -> Vec { + let f = match self.default_config() { + Err(_) => return vec![], + Ok(f) => f, + }; + + let mut supported_configs = vec![]; + + for &channels in DEFAULT_SUPPORTED_CHANNELS.iter() { + supported_configs.push(SupportedStreamConfigRange { + channels, + min_sample_rate: f.sample_rate, + max_sample_rate: f.sample_rate, + buffer_size: f.buffer_size.clone(), + sample_format: f.sample_format, + }); + } + supported_configs + } + + pub fn is_input(&self) -> bool { + matches!(self.device_type, DeviceType::InputDevice) + } + + pub fn is_output(&self) -> bool { + matches!(self.device_type, DeviceType::OutputDevice) + } +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + type Stream = Stream; + + fn name(&self) -> Result { + Ok(self.name.clone()) + } + + fn supported_input_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + /// Returns the default input config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_input_config(&self) -> Result { + self.default_config() + } + + /// Returns the default output config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_output_config(&self) -> Result { + self.default_config() + } + + fn build_input_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::OutputDevice = &self.device_type { + // Trying to create an input stream from an output device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_input(self.client, conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_inputs(); + } + + Ok(stream) + } + + fn build_output_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::InputDevice = &self.device_type { + // Trying to create an output stream from an input device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_output(self.client, conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_outputs(); + } + + Ok(stream) + } +} + +impl PartialEq for Device { + fn eq(&self, other: &Self) -> bool { + // Device::name() can never fail in this implementation + self.name().unwrap() == other.name().unwrap() + } +} + +impl Eq for Device {} + +impl Hash for Device { + fn hash(&self, state: &mut H) { + self.name().unwrap().hash(state); + } +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..67613f5b1 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,123 @@ +extern crate pipewire; + +use std::sync::mpsc; + +use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; +use traits::HostTrait; + +mod device; +pub use self::device::Device; +pub use self::stream::Stream; +mod stream; +mod conn; + +const PIPEWIRE_SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; +pub type Devices = std::vec::IntoIter; + +/// The PipeWire Host type + +pub struct Host { + /// The name that the client will have in PipeWire. + /// Until we have duplex streams two clients will be created adding "out" or "in" to the name + /// since names have to be unique. + name: String, + /// If ports are to be connected to the system (soundcard) ports automatically (default is true). + connect_ports_automatically: bool, + /// A list of the devices that have been created from this Host. + devices_created: Vec, + + client: conn::PWClient +} + +impl Host { + pub fn new() -> Result { + let client = conn::PWClient::new(); + + let mut host = Host { + name: "cpal_client".to_owned(), + connect_ports_automatically: true, + devices_created: vec![], + client + }; + + // Devices don't exist for PipeWire, they have to be created + host.initialize_default_devices(); + Ok(host) + } + /// Set whether the ports should automatically be connected to system + /// (default is true) + pub fn set_connect_automatically(&mut self, do_connect: bool) { + self.connect_ports_automatically = do_connect; + } + + pub fn input_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_input_device() + } + + pub fn output_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_output_device() + } + + fn initialize_default_devices(&mut self) { + let in_device_res = Device::default_input_device( + &self.name, + self.connect_ports_automatically, + self.client + ); + + match in_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + + let out_device_res = Device::default_output_device( + &self.name, + self.connect_ports_automatically, + self.client + ); + match out_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + + fn is_available() -> bool { + true + } + + fn devices(&self) -> Result { + Ok(self.devices_created.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + for device in &self.devices_created { + if device.is_input() { + return Some(device.clone()); + } + } + None + } + + fn default_output_device(&self) -> Option { + for device in &self.devices_created { + if device.is_output() { + return Some(device.clone()); + } + } + None + } +} \ No newline at end of file diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 7c0e02083..93c78f1ff 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -497,8 +497,14 @@ mod platform_impl { SupportedInputConfigs as JackSupportedInputConfigs, SupportedOutputConfigs as JackSupportedOutputConfigs, }; + #[cfg(feature = "pipewire")] + pub use crate::host::pipewire::{ + Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, Stream as PipeWireStream, + SupportedInputConfigs as PipeWireSupportedInputConfigs, + SupportedOutputConfigs as PipeWireupportedOutputConfigs, + }; - impl_platform_host!(#[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); + impl_platform_host!(#[cfg(feature = "pipewire")] PipeWire pipewire "PipeWire", #[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); /// The default host for the current compilation target platform. pub fn default_host() -> Host { From b5263ee958fe542c545ccc07672f524ede4fcc88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Sat, 5 Mar 2022 18:10:42 +0100 Subject: [PATCH 3/7] stream code base copied from jack host, nonfunctional --- src/host/pipewire/stream.rs | 464 ++++++++++++++++++++++++++++++++++++ 1 file changed, 464 insertions(+) create mode 100644 src/host/pipewire/stream.rs diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..8876db8da --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,464 @@ +use crate::ChannelCount; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use traits::StreamTrait; + +use crate::{ + BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, + PlayStreamError, SampleRate, StreamError, +}; + +use super::PIPEWIRE_SAMPLE_FORMAT; + +type ErrorCallbackPtr = Arc>; + +pub struct Stream { + // TODO: It might be faster to send a message when playing/pausing than to check this every iteration + playing: Arc, + async_client: jack::AsyncClient, + // Port names are stored in order to connect them to other ports in jack automatically + input_port_names: Vec, + output_port_names: Vec, +} + +impl Stream { + // TODO: Return error messages + pub fn new_input( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let mut ports = vec![]; + let mut port_names: Vec = vec![]; + // Create ports + for i in 0..channels { + let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + match port_try { + Ok(port) => { + // Get the port name in order to later connect it automatically + if let Ok(port_name) = port.name() { + port_names.push(port_name); + } + // Store the port into a Vec to move to the ProcessHandler + ports.push(port); + } + Err(e) => { + // If port creation failed, send the error back via the error_callback + error_callback( + BackendSpecificError { + description: e.to_string(), + } + .into(), + ); + } + } + } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + let input_process_handler = LocalProcessHandler::new( + vec![], + ports, + SampleRate(client.sample_rate() as u32), + client.buffer_size() as usize, + Some(Box::new(data_callback)), + None, + playing.clone(), + Arc::clone(&error_callback_ptr), + ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + let async_client = client + .activate_async(notification_handler, input_process_handler) + .unwrap(); + + Stream { + playing, + async_client, + input_port_names: port_names, + output_port_names: vec![], + } + } + + pub fn new_output( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let mut ports = vec![]; + let mut port_names: Vec = vec![]; + // Create ports + for i in 0..channels { + let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + match port_try { + Ok(port) => { + // Get the port name in order to later connect it automatically + if let Ok(port_name) = port.name() { + port_names.push(port_name); + } + // Store the port into a Vec to move to the ProcessHandler + ports.push(port); + } + Err(e) => { + // If port creation failed, send the error back via the error_callback + error_callback( + BackendSpecificError { + description: e.to_string(), + } + .into(), + ); + } + } + } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + let output_process_handler = LocalProcessHandler::new( + ports, + vec![], + SampleRate(client.sample_rate() as u32), + client.buffer_size() as usize, + None, + Some(Box::new(data_callback)), + playing.clone(), + Arc::clone(&error_callback_ptr), + ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + let async_client = client + .activate_async(notification_handler, output_process_handler) + .unwrap(); + + Stream { + playing, + async_client, + input_port_names: vec![], + output_port_names: port_names, + } + } + + /// Connect to the standard system outputs in jack, system:playback_1 and system:playback_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_outputs(&mut self) { + // Get the system ports + let system_ports = self.async_client.as_client().ports( + Some("system:playback_.*"), + None, + jack::PortFlags::empty(), + ); + + // Connect outputs from this client to the system playback inputs + for i in 0..self.output_port_names.len() { + if i >= system_ports.len() { + break; + } + match self + .async_client + .as_client() + .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + { + Ok(_) => (), + Err(e) => println!("Unable to connect to port with error {}", e), + } + } + } + + /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_inputs(&mut self) { + // Get the system ports + let system_ports = self.async_client.as_client().ports( + Some("system:capture_.*"), + None, + jack::PortFlags::empty(), + ); + + // Connect outputs from this client to the system playback inputs + for i in 0..self.input_port_names.len() { + if i >= system_ports.len() { + break; + } + match self + .async_client + .as_client() + .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + { + Ok(_) => (), + Err(e) => println!("Unable to connect to port with error {}", e), + } + } + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + self.playing.store(true, Ordering::SeqCst); + Ok(()) + } + + fn pause(&self) -> Result<(), PauseStreamError> { + self.playing.store(false, Ordering::SeqCst); + Ok(()) + } +} + +struct LocalProcessHandler { + /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes + out_ports: Vec>, + in_ports: Vec>, + + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option>, + + // JACK audio samples are 32-bit float (unless you do some custom dark magic) + temp_input_buffer: Vec, + temp_output_buffer: Vec, + playing: Arc, + creation_timestamp: std::time::Instant, + /// This should not be called on `process`, only on `buffer_size` because it can block. + error_callback_ptr: ErrorCallbackPtr, +} + +impl LocalProcessHandler { + fn new( + out_ports: Vec>, + in_ports: Vec>, + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option< + Box, + >, + playing: Arc, + error_callback_ptr: ErrorCallbackPtr, + ) -> Self { + // These may be reallocated in the `buffer_size` callback. + let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + + LocalProcessHandler { + out_ports, + in_ports, + sample_rate, + buffer_size, + input_data_callback, + output_data_callback, + temp_input_buffer, + temp_output_buffer, + playing, + creation_timestamp: std::time::Instant::now(), + error_callback_ptr, + } + } +} + +fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usize) -> Data { + let slice = &temp_input_buffer[0..total_buffer_size]; + let data = slice.as_ptr() as *mut (); + let len = total_buffer_size; + let data = unsafe { Data::from_parts(data, len, PIPEWIRE_SAMPLE_FORMAT) }; + data +} + +impl jack::ProcessHandler for LocalProcessHandler { + fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { + if !self.playing.load(Ordering::SeqCst) { + return jack::Control::Continue; + } + + // This should be equal to self.buffer_size, but the implementation will + // work even if it is less. Will panic in `temp_buffer_to_data` if greater. + let current_frame_count = process_scope.n_frames() as usize; + + // Get timestamp data + let cycle_times = process_scope.cycle_times(); + let current_start_usecs = match cycle_times { + Ok(times) => times.current_usecs, + Err(_) => { + // jack was unable to get the current time information + // Fall back to using Instants + let now = std::time::Instant::now(); + let duration = now.duration_since(self.creation_timestamp); + duration.as_micros() as u64 + } + }; + let start_cycle_instant = micros_to_stream_instant(current_start_usecs); + let start_callback_instant = start_cycle_instant + .add(frames_to_duration( + process_scope.frames_since_cycle_start() as usize, + self.sample_rate, + )) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + + if let Some(input_callback) = &mut self.input_data_callback { + // Let's get the data from the input ports and run the callback + + let num_in_channels = self.in_ports.len(); + + // Read the data from the input ports into the temporary buffer + // Go through every channel and store its data in the temporary input buffer + for ch_ix in 0..num_in_channels { + let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); + for i in 0..current_frame_count { + self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; + } + } + // Create a slice of exactly current_frame_count frames + let data = temp_buffer_to_data( + &mut self.temp_input_buffer, + current_frame_count * num_in_channels, + ); + // Create timestamp + let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; + let duration_since_cycle_start = + frames_to_duration(frames_since_cycle_start, self.sample_rate); + let callback = start_callback_instant + .add(duration_since_cycle_start) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let capture = start_callback_instant; + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; + input_callback(&data, &info); + } + + if let Some(output_callback) = &mut self.output_data_callback { + let num_out_channels = self.out_ports.len(); + + // Create a slice of exactly current_frame_count frames + let mut data = temp_buffer_to_data( + &mut self.temp_output_buffer, + current_frame_count * num_out_channels, + ); + // Create timestamp + let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; + let duration_since_cycle_start = + frames_to_duration(frames_since_cycle_start, self.sample_rate); + let callback = start_callback_instant + .add(duration_since_cycle_start) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); + let playback = start_cycle_instant + .add(buffer_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; + output_callback(&mut data, &info); + + // Deinterlace + for ch_ix in 0..num_out_channels { + let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); + for i in 0..current_frame_count { + output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; + } + } + } + + // Continue as normal + jack::Control::Continue + } + + fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { + // The `buffer_size` callback is actually called on the process thread, but + // it does not need to be suitable for real-time use. Thus we can simply allocate + // new buffers here. It is also fine to call the error callback. + // Details: https://github.com/RustAudio/rust-jack/issues/137 + let new_size = size as usize; + if new_size != self.buffer_size { + self.buffer_size = new_size; + self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; + self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; + let description = format!("buffer size changed to: {}", new_size); + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } + + jack::Control::Continue + } +} + +fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { + let nanos = micros * 1000; + let secs = micros / 1_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + crate::StreamInstant::new(secs as i64, subsec_nanos as u32) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +/// Receives notifications from the JACK server. It is unclear if this may be run concurrent with itself under JACK2 specs +/// so it needs to be Sync. +struct JackNotificationHandler { + error_callback_ptr: ErrorCallbackPtr, + init_sample_rate_flag: Arc, +} + +impl JackNotificationHandler { + pub fn new(error_callback_ptr: ErrorCallbackPtr) -> Self { + JackNotificationHandler { + error_callback_ptr, + init_sample_rate_flag: Arc::new(AtomicBool::new(false)), + } + } + + fn send_error(&mut self, description: String) { + // This thread isn't the audio thread, it's fine to block + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } +} + +impl jack::NotificationHandler for JackNotificationHandler { + fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { + self.send_error(format!("JACK was shut down for reason: {}", reason)); + } + + fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { + match self.init_sample_rate_flag.load(Ordering::SeqCst) { + false => { + // One of these notifications is sent every time a client is started. + self.init_sample_rate_flag.store(true, Ordering::SeqCst); + jack::Control::Continue + } + true => { + self.send_error(format!("sample rate changed to: {}", srate)); + // Since CPAL currently has no way of signaling a sample rate change in order to make + // all necessary changes that would bring we choose to quit. + jack::Control::Quit + } + } + } + + fn xrun(&mut self, _: &jack::Client) -> jack::Control { + self.send_error(String::from("xrun (buffer over or under run)")); + jack::Control::Continue + } +} From 15d7987228b10f110a8f0b53b4177da17b55c352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Wed, 9 Mar 2022 15:40:12 +0100 Subject: [PATCH 4/7] some fixes, node creation --- Cargo.toml | 2 +- src/host/pipewire/conn.rs | 216 +++++++++++++++++++++++++------ src/host/pipewire/device.rs | 20 +-- src/host/pipewire/mod.rs | 9 +- src/host/pipewire/stream.rs | 250 ++++++++++++++++++------------------ 5 files changed, 320 insertions(+), 177 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 696ff10db..e22094296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,4 +82,4 @@ name = "feedback" name = "record_wav" [[example]] -name = "synth_tones" +name = "synth_tones" \ No newline at end of file diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index fe4e0e3f8..1f3000d6c 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -2,26 +2,41 @@ extern crate pipewire; use self::pipewire::{ metadata::Metadata, + node::Node, prelude::*, registry::{GlobalObject, Registry}, - spa::ForeignDict, + spa::{Direction, ForeignDict}, types::ObjectType, + Core, MainLoop, }; use std::{ + borrow::BorrowMut, cell::{Cell, RefCell}, rc::Rc, sync::mpsc, thread, }; +use super::device::DeviceType; + +#[derive(Debug)] enum Message { Terminate, GetSettings, + CreateDeviceNode { + name: String, + device_type: DeviceType, + }, } enum MessageRepl { Settings(Settings), + NodeInfo(NodeInfo), +} + +pub struct NodeInfo { + pub name: String, } pub struct PWClient { @@ -34,7 +49,7 @@ impl PWClient { let (main_sender, main_receiver) = mpsc::channel(); let (pw_sender, pw_receiver) = pipewire::channel::channel(); - let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + let _pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); Self { pw_sender, @@ -42,24 +57,44 @@ impl PWClient { } } - pub fn get_settings(&self) -> Settings { - self.pw_sender.send(Message::GetSettings); + pub fn get_settings(&self) -> Result { + match self.pw_sender.send(Message::GetSettings) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::Settings(settings)) => Ok(settings), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } - if let MessageRepl::Settings(settings) = self.main_receiver.recv().expect("Reply") { - settings - } else { - Settings::default() - } + pub fn create_device_node( + &self, + name: String, + device_type: DeviceType, + ) -> Result { + match self + .pw_sender + .send(Message::CreateDeviceNode { name, device_type }) + { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::NodeInfo(info)) => Ok(info), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } } } #[derive(Default)] struct State { settings: Settings, + running: bool, } -#[derive(Default, Clone)] -struct Settings { +#[derive(Default, Clone, Debug)] +pub struct Settings { pub sample_rate: u32, pub min_buffer_size: u32, pub max_buffer_size: u32, @@ -70,43 +105,104 @@ fn pw_thread( main_sender: mpsc::Sender, pw_receiver: pipewire::channel::Receiver, ) { - let state = Rc::new(State::default()); - // let state = Rc::new(RefCell::new(State::default())); + // let state = Rc::new(State::default()); + let state = Rc::new(RefCell::new(State::default())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); - let core = context - .connect(None) - .expect("Failed to connect to PipeWire"); + let core = Rc::new( + context + .connect(None) + .expect("Failed to connect to PipeWire"), + ); let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); - let _receiver = pw_receiver.attach(&mainloop, |msg| { + let _receiver = pw_receiver.attach(&mainloop, { let mainloop = mainloop.clone(); + let state = state.clone(); + let main_sender = main_sender.clone(); + let core = core.clone(); - match msg { + move |msg| match msg { Message::Terminate => mainloop.quit(), Message::GetSettings => { - main_sender.send(MessageRepl::Settings(state.settings.clone())); + let settings = state.borrow().settings.clone(); + main_sender.send(MessageRepl::Settings(settings)); + } + Message::CreateDeviceNode { name, device_type } => { + println!("Creating device"); + let node: Node = core + .create_object( + "adapter", //node_factory.get().expect("No node factory found"), + &pipewire::properties! { + *pipewire::keys::NODE_NAME => name.clone(), + *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", + // *pipewire::keys::MEDIA_CLASS => match device_type { + // DeviceType::InputDevice => "Audio/Sink", + // DeviceType::OutputDevice => "Audio/Source" + // }, + *pipewire::keys::MEDIA_CLASS => "Audio/Sink", + // Don't remove the object on the remote when we destroy our proxy. + // *pipewire::keys::OBJECT_LINGER => "1" + }, + ) + .expect("Failed to create object"); + + let _list = node.add_listener_local() + .info(|f| { + println!("{:?}", f); + }) + .param(|a, b, c, d| { + println!("{}, {}, {}, {}", a,b,c,d); + }) + .register(); + + do_roundtrip(&mainloop, &core, &state); + println!("{:?}", node); + + main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + + state.as_ref().borrow_mut().running = false; + mainloop.quit(); } } }); - let state_clone = state.clone(); let _listener = registry .add_listener_local() - .global(|global| match global.type_ { - ObjectType::Metadata => handle_metadata(global, state_clone, ®istry), - _ => {} - }); + .global({ + let state = state.clone(); + let registry = registry.clone(); + let mainloop = mainloop.clone(); + let core = core.clone(); + + move |global| match global.type_ { + ObjectType::Metadata => { + handle_metadata(global, state.clone(), ®istry, &mainloop, &core) + } + _ => {} + } + }) + .register(); + + do_roundtrip(&mainloop, &core, &state); - mainloop.run(); + loop { + if state.borrow().running { + println!("LOOP START"); + mainloop.run(); + println!("LOOP END"); + } + } } fn handle_metadata( metadata: &GlobalObject, - state: Rc, + state: Rc>, registry: &Rc, + mainloop: &MainLoop, + core: &Rc, ) { let props = metadata .props @@ -117,24 +213,66 @@ fn handle_metadata( Some("settings") => { let settings: Metadata = registry.bind(metadata).expect("Metadata"); - settings + let _listener = settings .add_listener_local() - .property(|_, key, _, value| { - if let Some(value) = value { - if let Ok(value) = value.parse::() { - match key { - Some("clock.rate") => state.settings.sample_rate = value, - Some("clock.quantum") => state.settings.default_buffer_size = value, - Some("clock.min-quantum") => state.settings.min_buffer_size = value, - Some("clock.max-quantum") => state.settings.max_buffer_size = value, - None => {} - }; + .property({ + let state = state.clone(); + move |_, key, _, value| { + let mut state = state.as_ref().borrow_mut(); + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.quantum") => { + state.settings.default_buffer_size = value + } + Some("clock.min-quantum") => { + state.settings.min_buffer_size = value + } + Some("clock.max-quantum") => { + state.settings.max_buffer_size = value + } + _ => {} + }; + } } + 0 } - 0 }) .register(); + + do_roundtrip(mainloop, core, &state); } - None => {} + _ => {} }; } + +fn do_roundtrip(mainloop: &pipewire::MainLoop, core: &pipewire::Core, state: &Rc>) { + let done = Rc::new(Cell::new(false)); + let done_clone = done.clone(); + let loop_clone = mainloop.clone(); + let state = state.clone(); + + state.as_ref().borrow_mut().running = false; + mainloop.quit(); + + // Trigger the sync event. The server's answer won't be processed until we start the main loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let pending = core.sync(0).expect("sync failed"); + + let _listener_core = core + .add_listener_local() + .done(move |id, seq| { + if id == pipewire::PW_ID_CORE && seq == pending { + done_clone.set(true); + loop_clone.quit(); + } + }) + .register(); + + while !done.get() { + mainloop.run(); + } + + state.as_ref().borrow_mut().running = true; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 1c5e004f6..6632395f0 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -39,12 +39,16 @@ impl Device { name: String, connect_ports_automatically: bool, device_type: DeviceType, - client: super::conn::PWClient, + client: Rc, ) -> Result { - let settings = client.get_settings(); + while client.get_settings().and_then(|s| if s.sample_rate == 0 {Err(String::new())} else {Ok(true)} ).is_err() {} + + let settings = client.get_settings().unwrap(); + + let info = client.create_device_node(name, device_type.clone()).expect("Error creating device"); Ok(Device { - name: "cpal_client".to_string(), + name: info.name, sample_rate: SampleRate(settings.sample_rate), buffer_size: SupportedBufferSize::Range { min: settings.min_buffer_size, @@ -52,14 +56,14 @@ impl Device { }, device_type, connect_ports_automatically, - client: Rc::new(client) + client }) } pub fn default_output_device( name: &str, connect_ports_automatically: bool, - client: super::conn::PWClient, + client: Rc, ) -> Result { let output_client_name = format!("{}_out", name); Device::new_device( @@ -73,7 +77,7 @@ impl Device { pub fn default_input_device( name: &str, connect_ports_automatically: bool, - client: super::conn::PWClient, + client: Rc, ) -> Result { let input_client_name = format!("{}_in", name); Device::new_device( @@ -183,7 +187,7 @@ impl DeviceTrait for Device { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_input(self.client, conf.channels, data_callback, error_callback); + let mut stream = Stream::new_input(self.client.clone(), conf.channels, data_callback, error_callback); if self.connect_ports_automatically { stream.connect_to_system_inputs(); @@ -211,7 +215,7 @@ impl DeviceTrait for Device { return Err(BuildStreamError::StreamConfigNotSupported); } - let mut stream = Stream::new_output(self.client, conf.channels, data_callback, error_callback); + let mut stream = Stream::new_output(self.client.clone(), conf.channels, data_callback, error_callback); if self.connect_ports_automatically { stream.connect_to_system_outputs(); diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 67613f5b1..07b970117 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -1,5 +1,6 @@ extern crate pipewire; +use std::rc::Rc; use std::sync::mpsc; use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; @@ -29,12 +30,12 @@ pub struct Host { /// A list of the devices that have been created from this Host. devices_created: Vec, - client: conn::PWClient + client: Rc } impl Host { pub fn new() -> Result { - let client = conn::PWClient::new(); + let client = Rc::new(conn::PWClient::new()); let mut host = Host { name: "cpal_client".to_owned(), @@ -67,7 +68,7 @@ impl Host { let in_device_res = Device::default_input_device( &self.name, self.connect_ports_automatically, - self.client + self.client.clone() ); match in_device_res { @@ -80,7 +81,7 @@ impl Host { let out_device_res = Device::default_output_device( &self.name, self.connect_ports_automatically, - self.client + self.client.clone() ); match out_device_res { Ok(device) => self.devices_created.push(device), diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 8876db8da..dc4280726 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -16,10 +16,10 @@ type ErrorCallbackPtr = Arc>; pub struct Stream { // TODO: It might be faster to send a message when playing/pausing than to check this every iteration playing: Arc, - async_client: jack::AsyncClient, + // async_client: jack::AsyncClient, // Port names are stored in order to connect them to other ports in jack automatically - input_port_names: Vec, - output_port_names: Vec, + // input_port_names: Vec, + // output_port_names: Vec, } impl Stream { @@ -34,58 +34,58 @@ impl Stream { D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - let mut ports = vec![]; - let mut port_names: Vec = vec![]; + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; // Create ports - for i in 0..channels { - let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); - match port_try { - Ok(port) => { - // Get the port name in order to later connect it automatically - if let Ok(port_name) = port.name() { - port_names.push(port_name); - } - // Store the port into a Vec to move to the ProcessHandler - ports.push(port); - } - Err(e) => { - // If port creation failed, send the error back via the error_callback - error_callback( - BackendSpecificError { - description: e.to_string(), - } - .into(), - ); - } - } - } + // for i in 0..channels { + // let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } let playing = Arc::new(AtomicBool::new(true)); let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; - let input_process_handler = LocalProcessHandler::new( - vec![], - ports, - SampleRate(client.sample_rate() as u32), - client.buffer_size() as usize, - Some(Box::new(data_callback)), - None, - playing.clone(), - Arc::clone(&error_callback_ptr), - ); + // let input_process_handler = LocalProcessHandler::new( + // vec![], + // ports, + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // Some(Box::new(data_callback)), + // None, + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); let notification_handler = JackNotificationHandler::new(error_callback_ptr); - let async_client = client - .activate_async(notification_handler, input_process_handler) - .unwrap(); + // let async_client = client + // .activate_async(notification_handler, input_process_handler) + // .unwrap(); Stream { playing, - async_client, - input_port_names: port_names, - output_port_names: vec![], + // async_client, + // input_port_names: port_names, + // output_port_names: vec![], } } @@ -99,58 +99,58 @@ impl Stream { D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - let mut ports = vec![]; - let mut port_names: Vec = vec![]; - // Create ports - for i in 0..channels { - let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); - match port_try { - Ok(port) => { - // Get the port name in order to later connect it automatically - if let Ok(port_name) = port.name() { - port_names.push(port_name); - } - // Store the port into a Vec to move to the ProcessHandler - ports.push(port); - } - Err(e) => { - // If port creation failed, send the error back via the error_callback - error_callback( - BackendSpecificError { - description: e.to_string(), - } - .into(), - ); - } - } - } + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } let playing = Arc::new(AtomicBool::new(true)); let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; - let output_process_handler = LocalProcessHandler::new( - ports, - vec![], - SampleRate(client.sample_rate() as u32), - client.buffer_size() as usize, - None, - Some(Box::new(data_callback)), - playing.clone(), - Arc::clone(&error_callback_ptr), - ); + // let output_process_handler = LocalProcessHandler::new( + // ports, + // vec![], + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // None, + // Some(Box::new(data_callback)), + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); - let notification_handler = JackNotificationHandler::new(error_callback_ptr); + // let notification_handler = JackNotificationHandler::new(error_callback_ptr); - let async_client = client - .activate_async(notification_handler, output_process_handler) - .unwrap(); + // let async_client = client + // .activate_async(notification_handler, output_process_handler) + // .unwrap(); Stream { playing, - async_client, - input_port_names: vec![], - output_port_names: port_names, + // async_client, + // input_port_names: vec![], + // output_port_names: port_names, } } @@ -158,52 +158,52 @@ impl Stream { /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. pub fn connect_to_system_outputs(&mut self) { // Get the system ports - let system_ports = self.async_client.as_client().ports( - Some("system:playback_.*"), - None, - jack::PortFlags::empty(), - ); - - // Connect outputs from this client to the system playback inputs - for i in 0..self.output_port_names.len() { - if i >= system_ports.len() { - break; - } - match self - .async_client - .as_client() - .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) - { - Ok(_) => (), - Err(e) => println!("Unable to connect to port with error {}", e), - } - } + // let system_ports = self.async_client.as_client().ports( + // Some("system:playback_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.output_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } } /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. pub fn connect_to_system_inputs(&mut self) { // Get the system ports - let system_ports = self.async_client.as_client().ports( - Some("system:capture_.*"), - None, - jack::PortFlags::empty(), - ); - - // Connect outputs from this client to the system playback inputs - for i in 0..self.input_port_names.len() { - if i >= system_ports.len() { - break; - } - match self - .async_client - .as_client() - .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) - { - Ok(_) => (), - Err(e) => println!("Unable to connect to port with error {}", e), - } - } + // let system_ports = self.async_client.as_client().ports( + // Some("system:capture_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.input_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } } } From 8f0bba1908afcc05faccdcaeaec3297b9aee891b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Wed, 9 Mar 2022 22:41:54 +0100 Subject: [PATCH 5/7] cleanup, keep references to node/setting proxies and listeners, remove do_roundtrip --- src/host/pipewire/conn.rs | 146 +++++++++++++++++++----------------- src/host/pipewire/device.rs | 2 +- 2 files changed, 79 insertions(+), 69 deletions(-) diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index 1f3000d6c..75b218dee 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -1,9 +1,10 @@ extern crate pipewire; use self::pipewire::{ - metadata::Metadata, - node::Node, + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, prelude::*, + proxy::Listener, registry::{GlobalObject, Registry}, spa::{Direction, ForeignDict}, types::ObjectType, @@ -13,9 +14,11 @@ use self::pipewire::{ use std::{ borrow::BorrowMut, cell::{Cell, RefCell}, + collections::HashMap, rc::Rc, sync::mpsc, thread, + time::Duration, }; use super::device::DeviceType; @@ -27,6 +30,7 @@ enum Message { CreateDeviceNode { name: String, device_type: DeviceType, + autoconnect: bool, }, } @@ -72,11 +76,13 @@ impl PWClient { &self, name: String, device_type: DeviceType, + connect_ports_automatically: bool, ) -> Result { - match self - .pw_sender - .send(Message::CreateDeviceNode { name, device_type }) - { + match self.pw_sender.send(Message::CreateDeviceNode { + name, + device_type, + autoconnect: connect_ports_automatically, + }) { Ok(_) => match self.main_receiver.recv() { Ok(MessageRepl::NodeInfo(info)) => Ok(info), Err(err) => Err(format!("{:?}", err)), @@ -90,7 +96,7 @@ impl PWClient { #[derive(Default)] struct State { settings: Settings, - running: bool, + nodes: Vec, } #[derive(Default, Clone, Debug)] @@ -101,12 +107,25 @@ pub struct Settings { pub default_buffer_size: u32, } +enum ProxyItem { + Metadata { + _proxy: Metadata, + _listener: MetadataListener, + }, + Node { + _proxy: Node, + _listener: NodeListener, + }, +} + fn pw_thread( main_sender: mpsc::Sender, pw_receiver: pipewire::channel::Receiver, ) { + pipewire::init(); // let state = Rc::new(State::default()); let state = Rc::new(RefCell::new(State::default())); + let proxies = Rc::new(RefCell::new(HashMap::::new())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); @@ -130,79 +149,94 @@ fn pw_thread( let settings = state.borrow().settings.clone(); main_sender.send(MessageRepl::Settings(settings)); } - Message::CreateDeviceNode { name, device_type } => { - println!("Creating device"); + Message::CreateDeviceNode { + name, + device_type, + autoconnect, + } => { let node: Node = core .create_object( "adapter", //node_factory.get().expect("No node factory found"), &pipewire::properties! { *pipewire::keys::NODE_NAME => name.clone(), *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", - // *pipewire::keys::MEDIA_CLASS => match device_type { - // DeviceType::InputDevice => "Audio/Sink", - // DeviceType::OutputDevice => "Audio/Source" - // }, - *pipewire::keys::MEDIA_CLASS => "Audio/Sink", + *pipewire::keys::MEDIA_TYPE => "Audio", + *pipewire::keys::MEDIA_CATEGORY => match device_type { + DeviceType::InputDevice => "Capture", + DeviceType::OutputDevice => "Playback" + }, + *pipewire::keys::NODE_AUTOCONNECT => match autoconnect { + false => "false", + true => "true", + }, // Don't remove the object on the remote when we destroy our proxy. // *pipewire::keys::OBJECT_LINGER => "1" }, ) .expect("Failed to create object"); - let _list = node.add_listener_local() + let _listener = node + .add_listener_local() .info(|f| { println!("{:?}", f); }) .param(|a, b, c, d| { - println!("{}, {}, {}, {}", a,b,c,d); + println!("{}, {}, {}, {}", a, b, c, d); }) .register(); - do_roundtrip(&mainloop, &core, &state); println!("{:?}", node); - main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + state.as_ref().borrow_mut().nodes.push(node); + + // proxies.as_ref().borrow_mut().insert( + // node.proxy.id(), + // ProxyItem::Node { + // _proxy: node, + // _listener, + // }, + // ); - state.as_ref().borrow_mut().running = false; - mainloop.quit(); + main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); } } }); - let _listener = registry + let _reg_listener = registry .add_listener_local() .global({ let state = state.clone(); let registry = registry.clone(); - let mainloop = mainloop.clone(); - let core = core.clone(); + let proxies = proxies.clone(); move |global| match global.type_ { - ObjectType::Metadata => { - handle_metadata(global, state.clone(), ®istry, &mainloop, &core) - } + ObjectType::Metadata => handle_metadata(global, &state, ®istry, &proxies), _ => {} } }) .register(); - do_roundtrip(&mainloop, &core, &state); + // let timer = mainloop.add_timer({ + // move |_| { + // } + // }); - loop { - if state.borrow().running { - println!("LOOP START"); - mainloop.run(); - println!("LOOP END"); - } - } + // timer + // .update_timer( + // Some(Duration::from_millis(500)), + // Some(Duration::from_secs(1)), + // ) + // .into_result() + // .expect("FU"); + + mainloop.run(); } fn handle_metadata( metadata: &GlobalObject, - state: Rc>, + state: &Rc>, registry: &Rc, - mainloop: &MainLoop, - core: &Rc, + proxies: &Rc>>, ) { let props = metadata .props @@ -241,38 +275,14 @@ fn handle_metadata( }) .register(); - do_roundtrip(mainloop, core, &state); + proxies.as_ref().borrow_mut().insert( + metadata.id, + ProxyItem::Metadata { + _proxy: settings, + _listener, + }, + ); } _ => {} }; } - -fn do_roundtrip(mainloop: &pipewire::MainLoop, core: &pipewire::Core, state: &Rc>) { - let done = Rc::new(Cell::new(false)); - let done_clone = done.clone(); - let loop_clone = mainloop.clone(); - let state = state.clone(); - - state.as_ref().borrow_mut().running = false; - mainloop.quit(); - - // Trigger the sync event. The server's answer won't be processed until we start the main loop, - // so we can safely do this before setting up a callback. This lets us avoid using a Cell. - let pending = core.sync(0).expect("sync failed"); - - let _listener_core = core - .add_listener_local() - .done(move |id, seq| { - if id == pipewire::PW_ID_CORE && seq == pending { - done_clone.set(true); - loop_clone.quit(); - } - }) - .register(); - - while !done.get() { - mainloop.run(); - } - - state.as_ref().borrow_mut().running = true; -} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 6632395f0..9886e4c2b 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -45,7 +45,7 @@ impl Device { let settings = client.get_settings().unwrap(); - let info = client.create_device_node(name, device_type.clone()).expect("Error creating device"); + let info = client.create_device_node(name, device_type.clone(), connect_ports_automatically).expect("Error creating device"); Ok(Device { name: info.name, From 05dd14314aa282cd99cf76e9942e704f85e04bf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Thu, 10 Mar 2022 00:50:24 +0100 Subject: [PATCH 6/7] make stream.rs compile without jack feature --- src/host/pipewire/stream.rs | 310 ++++++++++++++++++------------------ 1 file changed, 155 insertions(+), 155 deletions(-) diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index dc4280726..f0bc1a892 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -221,8 +221,8 @@ impl StreamTrait for Stream { struct LocalProcessHandler { /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes - out_ports: Vec>, - in_ports: Vec>, + // out_ports: Vec>, + // in_ports: Vec>, sample_rate: SampleRate, buffer_size: usize, @@ -230,8 +230,8 @@ struct LocalProcessHandler { output_data_callback: Option>, // JACK audio samples are 32-bit float (unless you do some custom dark magic) - temp_input_buffer: Vec, - temp_output_buffer: Vec, + // temp_input_buffer: Vec, + // temp_output_buffer: Vec, playing: Arc, creation_timestamp: std::time::Instant, /// This should not be called on `process`, only on `buffer_size` because it can block. @@ -240,8 +240,8 @@ struct LocalProcessHandler { impl LocalProcessHandler { fn new( - out_ports: Vec>, - in_ports: Vec>, + // out_ports: Vec>, + // in_ports: Vec>, sample_rate: SampleRate, buffer_size: usize, input_data_callback: Option>, @@ -252,18 +252,18 @@ impl LocalProcessHandler { error_callback_ptr: ErrorCallbackPtr, ) -> Self { // These may be reallocated in the `buffer_size` callback. - let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; - let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + // let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + // let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; LocalProcessHandler { - out_ports, - in_ports, + // out_ports, + // in_ports, sample_rate, buffer_size, input_data_callback, output_data_callback, - temp_input_buffer, - temp_output_buffer, + // temp_input_buffer, + // temp_output_buffer, playing, creation_timestamp: std::time::Instant::now(), error_callback_ptr, @@ -279,123 +279,123 @@ fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usiz data } -impl jack::ProcessHandler for LocalProcessHandler { - fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { - if !self.playing.load(Ordering::SeqCst) { - return jack::Control::Continue; - } - - // This should be equal to self.buffer_size, but the implementation will - // work even if it is less. Will panic in `temp_buffer_to_data` if greater. - let current_frame_count = process_scope.n_frames() as usize; - - // Get timestamp data - let cycle_times = process_scope.cycle_times(); - let current_start_usecs = match cycle_times { - Ok(times) => times.current_usecs, - Err(_) => { - // jack was unable to get the current time information - // Fall back to using Instants - let now = std::time::Instant::now(); - let duration = now.duration_since(self.creation_timestamp); - duration.as_micros() as u64 - } - }; - let start_cycle_instant = micros_to_stream_instant(current_start_usecs); - let start_callback_instant = start_cycle_instant - .add(frames_to_duration( - process_scope.frames_since_cycle_start() as usize, - self.sample_rate, - )) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - - if let Some(input_callback) = &mut self.input_data_callback { - // Let's get the data from the input ports and run the callback - - let num_in_channels = self.in_ports.len(); - - // Read the data from the input ports into the temporary buffer - // Go through every channel and store its data in the temporary input buffer - for ch_ix in 0..num_in_channels { - let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); - for i in 0..current_frame_count { - self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; - } - } - // Create a slice of exactly current_frame_count frames - let data = temp_buffer_to_data( - &mut self.temp_input_buffer, - current_frame_count * num_in_channels, - ); - // Create timestamp - let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; - let duration_since_cycle_start = - frames_to_duration(frames_since_cycle_start, self.sample_rate); - let callback = start_callback_instant - .add(duration_since_cycle_start) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let capture = start_callback_instant; - let timestamp = crate::InputStreamTimestamp { callback, capture }; - let info = crate::InputCallbackInfo { timestamp }; - input_callback(&data, &info); - } - - if let Some(output_callback) = &mut self.output_data_callback { - let num_out_channels = self.out_ports.len(); - - // Create a slice of exactly current_frame_count frames - let mut data = temp_buffer_to_data( - &mut self.temp_output_buffer, - current_frame_count * num_out_channels, - ); - // Create timestamp - let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; - let duration_since_cycle_start = - frames_to_duration(frames_since_cycle_start, self.sample_rate); - let callback = start_callback_instant - .add(duration_since_cycle_start) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); - let playback = start_cycle_instant - .add(buffer_duration) - .expect("`playback` occurs beyond representation supported by `StreamInstant`"); - let timestamp = crate::OutputStreamTimestamp { callback, playback }; - let info = crate::OutputCallbackInfo { timestamp }; - output_callback(&mut data, &info); - - // Deinterlace - for ch_ix in 0..num_out_channels { - let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); - for i in 0..current_frame_count { - output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; - } - } - } - - // Continue as normal - jack::Control::Continue - } - - fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { - // The `buffer_size` callback is actually called on the process thread, but - // it does not need to be suitable for real-time use. Thus we can simply allocate - // new buffers here. It is also fine to call the error callback. - // Details: https://github.com/RustAudio/rust-jack/issues/137 - let new_size = size as usize; - if new_size != self.buffer_size { - self.buffer_size = new_size; - self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; - self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; - let description = format!("buffer size changed to: {}", new_size); - if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { - let err = &mut *mutex_guard; - err(BackendSpecificError { description }.into()); - } - } - - jack::Control::Continue - } -} +// impl jack::ProcessHandler for LocalProcessHandler { +// fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { +// if !self.playing.load(Ordering::SeqCst) { +// return jack::Control::Continue; +// } + +// // This should be equal to self.buffer_size, but the implementation will +// // work even if it is less. Will panic in `temp_buffer_to_data` if greater. +// let current_frame_count = process_scope.n_frames() as usize; + +// // Get timestamp data +// let cycle_times = process_scope.cycle_times(); +// let current_start_usecs = match cycle_times { +// Ok(times) => times.current_usecs, +// Err(_) => { +// // jack was unable to get the current time information +// // Fall back to using Instants +// let now = std::time::Instant::now(); +// let duration = now.duration_since(self.creation_timestamp); +// duration.as_micros() as u64 +// } +// }; +// let start_cycle_instant = micros_to_stream_instant(current_start_usecs); +// let start_callback_instant = start_cycle_instant +// .add(frames_to_duration( +// process_scope.frames_since_cycle_start() as usize, +// self.sample_rate, +// )) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + +// if let Some(input_callback) = &mut self.input_data_callback { +// // Let's get the data from the input ports and run the callback + +// let num_in_channels = self.in_ports.len(); + +// // Read the data from the input ports into the temporary buffer +// // Go through every channel and store its data in the temporary input buffer +// for ch_ix in 0..num_in_channels { +// let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); +// for i in 0..current_frame_count { +// self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; +// } +// } +// // Create a slice of exactly current_frame_count frames +// let data = temp_buffer_to_data( +// &mut self.temp_input_buffer, +// current_frame_count * num_in_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let capture = start_callback_instant; +// let timestamp = crate::InputStreamTimestamp { callback, capture }; +// let info = crate::InputCallbackInfo { timestamp }; +// input_callback(&data, &info); +// } + +// if let Some(output_callback) = &mut self.output_data_callback { +// let num_out_channels = self.out_ports.len(); + +// // Create a slice of exactly current_frame_count frames +// let mut data = temp_buffer_to_data( +// &mut self.temp_output_buffer, +// current_frame_count * num_out_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); +// let playback = start_cycle_instant +// .add(buffer_duration) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let timestamp = crate::OutputStreamTimestamp { callback, playback }; +// let info = crate::OutputCallbackInfo { timestamp }; +// output_callback(&mut data, &info); + +// // Deinterlace +// for ch_ix in 0..num_out_channels { +// let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); +// for i in 0..current_frame_count { +// output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; +// } +// } +// } + +// // Continue as normal +// jack::Control::Continue +// } + +// fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { +// // The `buffer_size` callback is actually called on the process thread, but +// // it does not need to be suitable for real-time use. Thus we can simply allocate +// // new buffers here. It is also fine to call the error callback. +// // Details: https://github.com/RustAudio/rust-jack/issues/137 +// let new_size = size as usize; +// if new_size != self.buffer_size { +// self.buffer_size = new_size; +// self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; +// self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; +// let description = format!("buffer size changed to: {}", new_size); +// if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { +// let err = &mut *mutex_guard; +// err(BackendSpecificError { description }.into()); +// } +// } + +// jack::Control::Continue +// } +// } fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { let nanos = micros * 1000; @@ -436,29 +436,29 @@ impl JackNotificationHandler { } } -impl jack::NotificationHandler for JackNotificationHandler { - fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { - self.send_error(format!("JACK was shut down for reason: {}", reason)); - } - - fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { - match self.init_sample_rate_flag.load(Ordering::SeqCst) { - false => { - // One of these notifications is sent every time a client is started. - self.init_sample_rate_flag.store(true, Ordering::SeqCst); - jack::Control::Continue - } - true => { - self.send_error(format!("sample rate changed to: {}", srate)); - // Since CPAL currently has no way of signaling a sample rate change in order to make - // all necessary changes that would bring we choose to quit. - jack::Control::Quit - } - } - } - - fn xrun(&mut self, _: &jack::Client) -> jack::Control { - self.send_error(String::from("xrun (buffer over or under run)")); - jack::Control::Continue - } -} +// impl jack::NotificationHandler for JackNotificationHandler { +// fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { +// self.send_error(format!("JACK was shut down for reason: {}", reason)); +// } + +// fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { +// match self.init_sample_rate_flag.load(Ordering::SeqCst) { +// false => { +// // One of these notifications is sent every time a client is started. +// self.init_sample_rate_flag.store(true, Ordering::SeqCst); +// jack::Control::Continue +// } +// true => { +// self.send_error(format!("sample rate changed to: {}", srate)); +// // Since CPAL currently has no way of signaling a sample rate change in order to make +// // all necessary changes that would bring we choose to quit. +// jack::Control::Quit +// } +// } +// } + +// fn xrun(&mut self, _: &jack::Client) -> jack::Control { +// self.send_error(String::from("xrun (buffer over or under run)")); +// jack::Control::Continue +// } +// } From 61aa078548f722cd2b383ccfc259da927bd50ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maria=C5=84ski?= Date: Thu, 10 Mar 2022 00:51:44 +0100 Subject: [PATCH 7/7] keep references to node listeners --- src/host/pipewire/conn.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs index 75b218dee..2586dec2a 100644 --- a/src/host/pipewire/conn.rs +++ b/src/host/pipewire/conn.rs @@ -96,7 +96,7 @@ impl PWClient { #[derive(Default)] struct State { settings: Settings, - nodes: Vec, + nodes: Vec, } #[derive(Default, Clone, Debug)] @@ -125,7 +125,7 @@ fn pw_thread( pipewire::init(); // let state = Rc::new(State::default()); let state = Rc::new(RefCell::new(State::default())); - let proxies = Rc::new(RefCell::new(HashMap::::new())); + let proxies = Rc::new(RefCell::new(HashMap::new())); let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); @@ -142,6 +142,7 @@ fn pw_thread( let state = state.clone(); let main_sender = main_sender.clone(); let core = core.clone(); + let proxies = proxies.clone(); move |msg| match msg { Message::Terminate => mainloop.quit(), @@ -177,8 +178,8 @@ fn pw_thread( let _listener = node .add_listener_local() - .info(|f| { - println!("{:?}", f); + .info(|info| { + // println!("{:?}", info); }) .param(|a, b, c, d| { println!("{}, {}, {}, {}", a, b, c, d); @@ -187,15 +188,10 @@ fn pw_thread( println!("{:?}", node); - state.as_ref().borrow_mut().nodes.push(node); - - // proxies.as_ref().borrow_mut().insert( - // node.proxy.id(), - // ProxyItem::Node { - // _proxy: node, - // _listener, - // }, - // ); + state.as_ref().borrow_mut().nodes.push(ProxyItem::Node { + _proxy: node, + _listener, + }); main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); }