diff --git a/Cargo.toml b/Cargo.toml index 15898dbda..e22094296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ nix = "0.23" libc = "0.2.65" parking_lot = "0.12" jack = { version = "0.9", optional = true } +# pipewire = { version = "0.4", optional = true } +pipewire = { git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] core-foundation-sys = "0.8.2" # For linking to CoreFoundation.framework and handling device name `CFString`s. @@ -80,4 +82,4 @@ name = "feedback" name = "record_wav" [[example]] -name = "synth_tones" +name = "synth_tones" \ No newline at end of file diff --git a/src/host/mod.rs b/src/host/mod.rs index 555b01022..bae7309a9 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -11,6 +11,11 @@ pub(crate) mod emscripten; feature = "jack" ))] pub(crate) mod jack; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; pub(crate) mod null; #[cfg(target_os = "android")] pub(crate) mod oboe; diff --git a/src/host/pipewire/conn.rs b/src/host/pipewire/conn.rs new file mode 100644 index 000000000..2586dec2a --- /dev/null +++ b/src/host/pipewire/conn.rs @@ -0,0 +1,284 @@ +extern crate pipewire; + +use self::pipewire::{ + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, + prelude::*, + proxy::Listener, + registry::{GlobalObject, Registry}, + spa::{Direction, ForeignDict}, + types::ObjectType, + Core, MainLoop, +}; + +use std::{ + borrow::BorrowMut, + cell::{Cell, RefCell}, + collections::HashMap, + rc::Rc, + sync::mpsc, + thread, + time::Duration, +}; + +use super::device::DeviceType; + +#[derive(Debug)] +enum Message { + Terminate, + GetSettings, + CreateDeviceNode { + name: String, + device_type: DeviceType, + autoconnect: bool, + }, +} + +enum MessageRepl { + Settings(Settings), + NodeInfo(NodeInfo), +} + +pub struct NodeInfo { + pub name: String, +} + +pub struct PWClient { + pw_sender: pipewire::channel::Sender, + main_receiver: mpsc::Receiver, +} + +impl PWClient { + pub fn new() -> Self { + let (main_sender, main_receiver) = mpsc::channel(); + let (pw_sender, pw_receiver) = pipewire::channel::channel(); + + let _pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver)); + + Self { + pw_sender, + main_receiver, + } + } + + pub fn get_settings(&self) -> Result { + match self.pw_sender.send(Message::GetSettings) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::Settings(settings)) => Ok(settings), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } + + pub fn create_device_node( + &self, + name: String, + device_type: DeviceType, + connect_ports_automatically: bool, + ) -> Result { + match self.pw_sender.send(Message::CreateDeviceNode { + name, + device_type, + autoconnect: connect_ports_automatically, + }) { + Ok(_) => match self.main_receiver.recv() { + Ok(MessageRepl::NodeInfo(info)) => Ok(info), + Err(err) => Err(format!("{:?}", err)), + _ => Err(format!("")), + }, + Err(err) => Err(format!("{:?}", err)), + } + } +} + +#[derive(Default)] +struct State { + settings: Settings, + nodes: Vec, +} + +#[derive(Default, Clone, Debug)] +pub struct Settings { + pub sample_rate: u32, + pub min_buffer_size: u32, + pub max_buffer_size: u32, + pub default_buffer_size: u32, +} + +enum ProxyItem { + Metadata { + _proxy: Metadata, + _listener: MetadataListener, + }, + Node { + _proxy: Node, + _listener: NodeListener, + }, +} + +fn pw_thread( + main_sender: mpsc::Sender, + pw_receiver: pipewire::channel::Receiver, +) { + pipewire::init(); + // let state = Rc::new(State::default()); + let state = Rc::new(RefCell::new(State::default())); + let proxies = Rc::new(RefCell::new(HashMap::new())); + + let mainloop = pipewire::MainLoop::new().expect("Failed to create PipeWire Mainloop"); + + let context = pipewire::Context::new(&mainloop).expect("Failed to create PipeWire Context"); + let core = Rc::new( + context + .connect(None) + .expect("Failed to connect to PipeWire"), + ); + let registry = Rc::new(core.get_registry().expect("Failed to get Registry")); + + let _receiver = pw_receiver.attach(&mainloop, { + let mainloop = mainloop.clone(); + let state = state.clone(); + let main_sender = main_sender.clone(); + let core = core.clone(); + let proxies = proxies.clone(); + + move |msg| match msg { + Message::Terminate => mainloop.quit(), + Message::GetSettings => { + let settings = state.borrow().settings.clone(); + main_sender.send(MessageRepl::Settings(settings)); + } + Message::CreateDeviceNode { + name, + device_type, + autoconnect, + } => { + let node: Node = core + .create_object( + "adapter", //node_factory.get().expect("No node factory found"), + &pipewire::properties! { + *pipewire::keys::NODE_NAME => name.clone(), + *pipewire::keys::FACTORY_NAME => "support.null-audio-sink", + *pipewire::keys::MEDIA_TYPE => "Audio", + *pipewire::keys::MEDIA_CATEGORY => match device_type { + DeviceType::InputDevice => "Capture", + DeviceType::OutputDevice => "Playback" + }, + *pipewire::keys::NODE_AUTOCONNECT => match autoconnect { + false => "false", + true => "true", + }, + // Don't remove the object on the remote when we destroy our proxy. + // *pipewire::keys::OBJECT_LINGER => "1" + }, + ) + .expect("Failed to create object"); + + let _listener = node + .add_listener_local() + .info(|info| { + // println!("{:?}", info); + }) + .param(|a, b, c, d| { + println!("{}, {}, {}, {}", a, b, c, d); + }) + .register(); + + println!("{:?}", node); + + state.as_ref().borrow_mut().nodes.push(ProxyItem::Node { + _proxy: node, + _listener, + }); + + main_sender.send(MessageRepl::NodeInfo(NodeInfo { name })); + } + } + }); + + let _reg_listener = registry + .add_listener_local() + .global({ + let state = state.clone(); + let registry = registry.clone(); + let proxies = proxies.clone(); + + move |global| match global.type_ { + ObjectType::Metadata => handle_metadata(global, &state, ®istry, &proxies), + _ => {} + } + }) + .register(); + + // let timer = mainloop.add_timer({ + // move |_| { + // } + // }); + + // timer + // .update_timer( + // Some(Duration::from_millis(500)), + // Some(Duration::from_secs(1)), + // ) + // .into_result() + // .expect("FU"); + + mainloop.run(); +} + +fn handle_metadata( + metadata: &GlobalObject, + state: &Rc>, + registry: &Rc, + proxies: &Rc>>, +) { + let props = metadata + .props + .as_ref() + .expect("Metadata object is missing properties"); + + match props.get("metadata.name") { + Some("settings") => { + let settings: Metadata = registry.bind(metadata).expect("Metadata"); + + let _listener = settings + .add_listener_local() + .property({ + let state = state.clone(); + move |_, key, _, value| { + let mut state = state.as_ref().borrow_mut(); + if let Some(value) = value { + if let Ok(value) = value.parse::() { + match key { + Some("clock.rate") => state.settings.sample_rate = value, + Some("clock.quantum") => { + state.settings.default_buffer_size = value + } + Some("clock.min-quantum") => { + state.settings.min_buffer_size = value + } + Some("clock.max-quantum") => { + state.settings.max_buffer_size = value + } + _ => {} + }; + } + } + 0 + } + }) + .register(); + + proxies.as_ref().borrow_mut().insert( + metadata.id, + ProxyItem::Metadata { + _proxy: settings, + _listener, + }, + ); + } + _ => {} + }; +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..9886e4c2b --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,241 @@ +use crate::{ + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, + InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, StreamError, + SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, +}; +use std::hash::{Hash, Hasher}; +use std::rc::Rc; +use traits::DeviceTrait; + +use super::stream::Stream; +use super::PIPEWIRE_SAMPLE_FORMAT; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; + +const DEFAULT_NUM_CHANNELS: u16 = 2; +const DEFAULT_SUPPORTED_CHANNELS: [u16; 10] = [1, 2, 4, 6, 8, 16, 24, 32, 48, 64]; + +/// If a device is for input or output. +/// Until we have duplex stream support PipeWire nodes and CPAL devices for PipeWire will be either input or output. +#[derive(Clone, Debug)] +pub enum DeviceType { + InputDevice, + OutputDevice, +} +#[derive(Clone)] +pub struct Device { + name: String, + sample_rate: SampleRate, + buffer_size: SupportedBufferSize, + device_type: DeviceType, + connect_ports_automatically: bool, + client: Rc +} + +impl Device { + fn new_device( + name: String, + connect_ports_automatically: bool, + device_type: DeviceType, + client: Rc, + ) -> Result { + while client.get_settings().and_then(|s| if s.sample_rate == 0 {Err(String::new())} else {Ok(true)} ).is_err() {} + + let settings = client.get_settings().unwrap(); + + let info = client.create_device_node(name, device_type.clone(), connect_ports_automatically).expect("Error creating device"); + + Ok(Device { + name: info.name, + sample_rate: SampleRate(settings.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: settings.min_buffer_size, + max: settings.max_buffer_size, + }, + device_type, + connect_ports_automatically, + client + }) + } + + pub fn default_output_device( + name: &str, + connect_ports_automatically: bool, + client: Rc, + ) -> Result { + let output_client_name = format!("{}_out", name); + Device::new_device( + output_client_name, + connect_ports_automatically, + DeviceType::OutputDevice, + client, + ) + } + + pub fn default_input_device( + name: &str, + connect_ports_automatically: bool, + client: Rc, + ) -> Result { + let input_client_name = format!("{}_in", name); + Device::new_device( + input_client_name, + connect_ports_automatically, + DeviceType::InputDevice, + client, + ) + } + + pub fn default_config(&self) -> Result { + let channels = DEFAULT_NUM_CHANNELS; + let sample_rate = self.sample_rate; + let buffer_size = self.buffer_size.clone(); + // The sample format for JACK audio ports is always "32-bit float mono audio" in the current implementation. + // Custom formats are allowed within JACK, but this is of niche interest. + // The format can be found programmatically by calling jack::PortSpec::port_type() on a created port. + let sample_format = PIPEWIRE_SAMPLE_FORMAT; + Ok(SupportedStreamConfig { + channels, + sample_rate, + buffer_size, + sample_format, + }) + } + + pub fn supported_configs(&self) -> Vec { + let f = match self.default_config() { + Err(_) => return vec![], + Ok(f) => f, + }; + + let mut supported_configs = vec![]; + + for &channels in DEFAULT_SUPPORTED_CHANNELS.iter() { + supported_configs.push(SupportedStreamConfigRange { + channels, + min_sample_rate: f.sample_rate, + max_sample_rate: f.sample_rate, + buffer_size: f.buffer_size.clone(), + sample_format: f.sample_format, + }); + } + supported_configs + } + + pub fn is_input(&self) -> bool { + matches!(self.device_type, DeviceType::InputDevice) + } + + pub fn is_output(&self) -> bool { + matches!(self.device_type, DeviceType::OutputDevice) + } +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + type Stream = Stream; + + fn name(&self) -> Result { + Ok(self.name.clone()) + } + + fn supported_input_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + Ok(self.supported_configs().into_iter()) + } + + /// Returns the default input config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_input_config(&self) -> Result { + self.default_config() + } + + /// Returns the default output config + /// The sample format for JACK audio ports is always "32-bit float mono audio" unless using a custom type. + /// The sample rate is set by the JACK server. + fn default_output_config(&self) -> Result { + self.default_config() + } + + fn build_input_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::OutputDevice = &self.device_type { + // Trying to create an input stream from an output device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_input(self.client.clone(), conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_inputs(); + } + + Ok(stream) + } + + fn build_output_stream_raw( + &self, + conf: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + if let DeviceType::InputDevice = &self.device_type { + // Trying to create an output stream from an input device + return Err(BuildStreamError::StreamConfigNotSupported); + } + if conf.sample_rate != self.sample_rate || sample_format != PIPEWIRE_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let mut stream = Stream::new_output(self.client.clone(), conf.channels, data_callback, error_callback); + + if self.connect_ports_automatically { + stream.connect_to_system_outputs(); + } + + Ok(stream) + } +} + +impl PartialEq for Device { + fn eq(&self, other: &Self) -> bool { + // Device::name() can never fail in this implementation + self.name().unwrap() == other.name().unwrap() + } +} + +impl Eq for Device {} + +impl Hash for Device { + fn hash(&self, state: &mut H) { + self.name().unwrap().hash(state); + } +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..07b970117 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,124 @@ +extern crate pipewire; + +use std::rc::Rc; +use std::sync::mpsc; + +use crate::{DevicesError, SampleFormat, SupportedStreamConfigRange}; +use traits::HostTrait; + +mod device; +pub use self::device::Device; +pub use self::stream::Stream; +mod stream; +mod conn; + +const PIPEWIRE_SAMPLE_FORMAT: SampleFormat = SampleFormat::F32; + +pub type SupportedInputConfigs = std::vec::IntoIter; +pub type SupportedOutputConfigs = std::vec::IntoIter; +pub type Devices = std::vec::IntoIter; + +/// The PipeWire Host type + +pub struct Host { + /// The name that the client will have in PipeWire. + /// Until we have duplex streams two clients will be created adding "out" or "in" to the name + /// since names have to be unique. + name: String, + /// If ports are to be connected to the system (soundcard) ports automatically (default is true). + connect_ports_automatically: bool, + /// A list of the devices that have been created from this Host. + devices_created: Vec, + + client: Rc +} + +impl Host { + pub fn new() -> Result { + let client = Rc::new(conn::PWClient::new()); + + let mut host = Host { + name: "cpal_client".to_owned(), + connect_ports_automatically: true, + devices_created: vec![], + client + }; + + // Devices don't exist for PipeWire, they have to be created + host.initialize_default_devices(); + Ok(host) + } + /// Set whether the ports should automatically be connected to system + /// (default is true) + pub fn set_connect_automatically(&mut self, do_connect: bool) { + self.connect_ports_automatically = do_connect; + } + + pub fn input_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_input_device() + } + + pub fn output_device_with_name(&mut self, name: &str) -> Option { + self.name = name.to_owned(); + self.default_output_device() + } + + fn initialize_default_devices(&mut self) { + let in_device_res = Device::default_input_device( + &self.name, + self.connect_ports_automatically, + self.client.clone() + ); + + match in_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + + let out_device_res = Device::default_output_device( + &self.name, + self.connect_ports_automatically, + self.client.clone() + ); + match out_device_res { + Ok(device) => self.devices_created.push(device), + Err(err) => { + println!("{}", err); + } + } + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + + fn is_available() -> bool { + true + } + + fn devices(&self) -> Result { + Ok(self.devices_created.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + for device in &self.devices_created { + if device.is_input() { + return Some(device.clone()); + } + } + None + } + + fn default_output_device(&self) -> Option { + for device in &self.devices_created { + if device.is_output() { + return Some(device.clone()); + } + } + None + } +} \ No newline at end of file diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..f0bc1a892 --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,464 @@ +use crate::ChannelCount; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use traits::StreamTrait; + +use crate::{ + BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, + PlayStreamError, SampleRate, StreamError, +}; + +use super::PIPEWIRE_SAMPLE_FORMAT; + +type ErrorCallbackPtr = Arc>; + +pub struct Stream { + // TODO: It might be faster to send a message when playing/pausing than to check this every iteration + playing: Arc, + // async_client: jack::AsyncClient, + // Port names are stored in order to connect them to other ports in jack automatically + // input_port_names: Vec, + // output_port_names: Vec, +} + +impl Stream { + // TODO: Return error messages + pub fn new_input( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("in_{}", i), jack::AudioIn::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + // let input_process_handler = LocalProcessHandler::new( + // vec![], + // ports, + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // Some(Box::new(data_callback)), + // None, + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); + + let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + // let async_client = client + // .activate_async(notification_handler, input_process_handler) + // .unwrap(); + + Stream { + playing, + // async_client, + // input_port_names: port_names, + // output_port_names: vec![], + } + } + + pub fn new_output( + client: Rc, + channels: ChannelCount, + data_callback: D, + mut error_callback: E, + ) -> Stream + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // let mut ports = vec![]; + // let mut port_names: Vec = vec![]; + // // Create ports + // for i in 0..channels { + // let port_try = client.register_port(&format!("out_{}", i), jack::AudioOut::default()); + // match port_try { + // Ok(port) => { + // // Get the port name in order to later connect it automatically + // if let Ok(port_name) = port.name() { + // port_names.push(port_name); + // } + // // Store the port into a Vec to move to the ProcessHandler + // ports.push(port); + // } + // Err(e) => { + // // If port creation failed, send the error back via the error_callback + // error_callback( + // BackendSpecificError { + // description: e.to_string(), + // } + // .into(), + // ); + // } + // } + // } + + let playing = Arc::new(AtomicBool::new(true)); + + let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr; + + // let output_process_handler = LocalProcessHandler::new( + // ports, + // vec![], + // SampleRate(client.sample_rate() as u32), + // client.buffer_size() as usize, + // None, + // Some(Box::new(data_callback)), + // playing.clone(), + // Arc::clone(&error_callback_ptr), + // ); + + // let notification_handler = JackNotificationHandler::new(error_callback_ptr); + + // let async_client = client + // .activate_async(notification_handler, output_process_handler) + // .unwrap(); + + Stream { + playing, + // async_client, + // input_port_names: vec![], + // output_port_names: port_names, + } + } + + /// Connect to the standard system outputs in jack, system:playback_1 and system:playback_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_outputs(&mut self) { + // Get the system ports + // let system_ports = self.async_client.as_client().ports( + // Some("system:playback_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.output_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&self.output_port_names[i], &system_ports[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } + } + + /// Connect to the standard system outputs in jack, system:capture_1 and system:capture_2 + /// This has to be done after the client is activated, doing it just after creating the ports doesn't work. + pub fn connect_to_system_inputs(&mut self) { + // Get the system ports + // let system_ports = self.async_client.as_client().ports( + // Some("system:capture_.*"), + // None, + // jack::PortFlags::empty(), + // ); + + // // Connect outputs from this client to the system playback inputs + // for i in 0..self.input_port_names.len() { + // if i >= system_ports.len() { + // break; + // } + // match self + // .async_client + // .as_client() + // .connect_ports_by_name(&system_ports[i], &self.input_port_names[i]) + // { + // Ok(_) => (), + // Err(e) => println!("Unable to connect to port with error {}", e), + // } + // } + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + self.playing.store(true, Ordering::SeqCst); + Ok(()) + } + + fn pause(&self) -> Result<(), PauseStreamError> { + self.playing.store(false, Ordering::SeqCst); + Ok(()) + } +} + +struct LocalProcessHandler { + /// No new ports are allowed to be created after the creation of the LocalProcessHandler as that would invalidate the buffer sizes + // out_ports: Vec>, + // in_ports: Vec>, + + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option>, + + // JACK audio samples are 32-bit float (unless you do some custom dark magic) + // temp_input_buffer: Vec, + // temp_output_buffer: Vec, + playing: Arc, + creation_timestamp: std::time::Instant, + /// This should not be called on `process`, only on `buffer_size` because it can block. + error_callback_ptr: ErrorCallbackPtr, +} + +impl LocalProcessHandler { + fn new( + // out_ports: Vec>, + // in_ports: Vec>, + sample_rate: SampleRate, + buffer_size: usize, + input_data_callback: Option>, + output_data_callback: Option< + Box, + >, + playing: Arc, + error_callback_ptr: ErrorCallbackPtr, + ) -> Self { + // These may be reallocated in the `buffer_size` callback. + // let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; + // let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size]; + + LocalProcessHandler { + // out_ports, + // in_ports, + sample_rate, + buffer_size, + input_data_callback, + output_data_callback, + // temp_input_buffer, + // temp_output_buffer, + playing, + creation_timestamp: std::time::Instant::now(), + error_callback_ptr, + } + } +} + +fn temp_buffer_to_data(temp_input_buffer: &mut Vec, total_buffer_size: usize) -> Data { + let slice = &temp_input_buffer[0..total_buffer_size]; + let data = slice.as_ptr() as *mut (); + let len = total_buffer_size; + let data = unsafe { Data::from_parts(data, len, PIPEWIRE_SAMPLE_FORMAT) }; + data +} + +// impl jack::ProcessHandler for LocalProcessHandler { +// fn process(&mut self, _: &jack::Client, process_scope: &jack::ProcessScope) -> jack::Control { +// if !self.playing.load(Ordering::SeqCst) { +// return jack::Control::Continue; +// } + +// // This should be equal to self.buffer_size, but the implementation will +// // work even if it is less. Will panic in `temp_buffer_to_data` if greater. +// let current_frame_count = process_scope.n_frames() as usize; + +// // Get timestamp data +// let cycle_times = process_scope.cycle_times(); +// let current_start_usecs = match cycle_times { +// Ok(times) => times.current_usecs, +// Err(_) => { +// // jack was unable to get the current time information +// // Fall back to using Instants +// let now = std::time::Instant::now(); +// let duration = now.duration_since(self.creation_timestamp); +// duration.as_micros() as u64 +// } +// }; +// let start_cycle_instant = micros_to_stream_instant(current_start_usecs); +// let start_callback_instant = start_cycle_instant +// .add(frames_to_duration( +// process_scope.frames_since_cycle_start() as usize, +// self.sample_rate, +// )) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + +// if let Some(input_callback) = &mut self.input_data_callback { +// // Let's get the data from the input ports and run the callback + +// let num_in_channels = self.in_ports.len(); + +// // Read the data from the input ports into the temporary buffer +// // Go through every channel and store its data in the temporary input buffer +// for ch_ix in 0..num_in_channels { +// let input_channel = &self.in_ports[ch_ix].as_slice(process_scope); +// for i in 0..current_frame_count { +// self.temp_input_buffer[ch_ix + i * num_in_channels] = input_channel[i]; +// } +// } +// // Create a slice of exactly current_frame_count frames +// let data = temp_buffer_to_data( +// &mut self.temp_input_buffer, +// current_frame_count * num_in_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let capture = start_callback_instant; +// let timestamp = crate::InputStreamTimestamp { callback, capture }; +// let info = crate::InputCallbackInfo { timestamp }; +// input_callback(&data, &info); +// } + +// if let Some(output_callback) = &mut self.output_data_callback { +// let num_out_channels = self.out_ports.len(); + +// // Create a slice of exactly current_frame_count frames +// let mut data = temp_buffer_to_data( +// &mut self.temp_output_buffer, +// current_frame_count * num_out_channels, +// ); +// // Create timestamp +// let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize; +// let duration_since_cycle_start = +// frames_to_duration(frames_since_cycle_start, self.sample_rate); +// let callback = start_callback_instant +// .add(duration_since_cycle_start) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate); +// let playback = start_cycle_instant +// .add(buffer_duration) +// .expect("`playback` occurs beyond representation supported by `StreamInstant`"); +// let timestamp = crate::OutputStreamTimestamp { callback, playback }; +// let info = crate::OutputCallbackInfo { timestamp }; +// output_callback(&mut data, &info); + +// // Deinterlace +// for ch_ix in 0..num_out_channels { +// let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope); +// for i in 0..current_frame_count { +// output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels]; +// } +// } +// } + +// // Continue as normal +// jack::Control::Continue +// } + +// fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control { +// // The `buffer_size` callback is actually called on the process thread, but +// // it does not need to be suitable for real-time use. Thus we can simply allocate +// // new buffers here. It is also fine to call the error callback. +// // Details: https://github.com/RustAudio/rust-jack/issues/137 +// let new_size = size as usize; +// if new_size != self.buffer_size { +// self.buffer_size = new_size; +// self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size]; +// self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size]; +// let description = format!("buffer size changed to: {}", new_size); +// if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { +// let err = &mut *mutex_guard; +// err(BackendSpecificError { description }.into()); +// } +// } + +// jack::Control::Continue +// } +// } + +fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant { + let nanos = micros * 1000; + let secs = micros / 1_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + crate::StreamInstant::new(secs as i64, subsec_nanos as u32) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +/// Receives notifications from the JACK server. It is unclear if this may be run concurrent with itself under JACK2 specs +/// so it needs to be Sync. +struct JackNotificationHandler { + error_callback_ptr: ErrorCallbackPtr, + init_sample_rate_flag: Arc, +} + +impl JackNotificationHandler { + pub fn new(error_callback_ptr: ErrorCallbackPtr) -> Self { + JackNotificationHandler { + error_callback_ptr, + init_sample_rate_flag: Arc::new(AtomicBool::new(false)), + } + } + + fn send_error(&mut self, description: String) { + // This thread isn't the audio thread, it's fine to block + if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() { + let err = &mut *mutex_guard; + err(BackendSpecificError { description }.into()); + } + } +} + +// impl jack::NotificationHandler for JackNotificationHandler { +// fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { +// self.send_error(format!("JACK was shut down for reason: {}", reason)); +// } + +// fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { +// match self.init_sample_rate_flag.load(Ordering::SeqCst) { +// false => { +// // One of these notifications is sent every time a client is started. +// self.init_sample_rate_flag.store(true, Ordering::SeqCst); +// jack::Control::Continue +// } +// true => { +// self.send_error(format!("sample rate changed to: {}", srate)); +// // Since CPAL currently has no way of signaling a sample rate change in order to make +// // all necessary changes that would bring we choose to quit. +// jack::Control::Quit +// } +// } +// } + +// fn xrun(&mut self, _: &jack::Client) -> jack::Control { +// self.send_error(String::from("xrun (buffer over or under run)")); +// jack::Control::Continue +// } +// } diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 7c0e02083..93c78f1ff 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -497,8 +497,14 @@ mod platform_impl { SupportedInputConfigs as JackSupportedInputConfigs, SupportedOutputConfigs as JackSupportedOutputConfigs, }; + #[cfg(feature = "pipewire")] + pub use crate::host::pipewire::{ + Device as PipeWireDevice, Devices as PipeWireDevices, Host as PipeWireHost, Stream as PipeWireStream, + SupportedInputConfigs as PipeWireSupportedInputConfigs, + SupportedOutputConfigs as PipeWireupportedOutputConfigs, + }; - impl_platform_host!(#[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); + impl_platform_host!(#[cfg(feature = "pipewire")] PipeWire pipewire "PipeWire", #[cfg(feature = "jack")] Jack jack "JACK", Alsa alsa "ALSA"); /// The default host for the current compilation target platform. pub fn default_host() -> Host {