From a31856fcdd967ff2c02f77c1ff110510197ab305 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 26 Jun 2024 08:37:05 -0700 Subject: [PATCH] Fix issues with test livekit client --- Cargo.lock | 1 - crates/live_kit_client/Cargo.toml | 2 - crates/live_kit_client/src/live_kit_client.rs | 37 ++++++++++--------- crates/live_kit_client/src/test.rs | 36 ++++++++---------- 4 files changed, 34 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 405d7705584943..247ce14d75725e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6263,7 +6263,6 @@ name = "live_kit_client" version = "0.1.0" dependencies = [ "anyhow", - "async-broadcast", "async-trait", "collections", "core-foundation", diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index 1ee8f82ac11131..b5485a0b018319 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -28,7 +28,6 @@ test-support = [ [dependencies] anyhow.workspace = true -async-broadcast = "0.7" async-trait = { workspace = true, optional = true } cpal = { git = "https://github.com/zed-industries/cpal", rev = "fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50" } collections = { workspace = true } @@ -47,7 +46,6 @@ util.workspace = true core-foundation.workspace = true [target.'cfg(all(not(target_os = "macos")))'.dependencies] -async-trait = { workspace = true } collections = { workspace = true } gpui = { workspace = true } live_kit_server.workspace = true diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index c225153f25b04a..36dc61d2b27fc4 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -13,6 +13,7 @@ use gpui::{AppContext, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStr use media::core_video::{CVImageBuffer, CVImageBufferRef}; use parking_lot::Mutex; use std::{borrow::Cow, sync::Arc}; +use util::ResultExt as _; use webrtc::{ audio_frame::AudioFrame, audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource}, @@ -29,23 +30,27 @@ pub use test::*; pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent}; -pub fn init(dispatcher: Arc) { - struct Dispatcher(Arc); +pub struct AudioStream { + _tasks: [Task<()>; 2], +} - impl livekit::dispatcher::Dispatcher for Dispatcher { - fn dispatch(&self, runnable: livekit::dispatcher::Runnable) { - self.0.dispatch(runnable, None); - } +struct Dispatcher(Arc); - fn dispatch_after( - &self, - duration: std::time::Duration, - runnable: livekit::dispatcher::Runnable, - ) { - self.0.dispatch_after(duration, runnable); - } +impl livekit::dispatcher::Dispatcher for Dispatcher { + fn dispatch(&self, runnable: livekit::dispatcher::Runnable) { + self.0.dispatch(runnable, None); + } + + fn dispatch_after( + &self, + duration: std::time::Duration, + runnable: livekit::dispatcher::Runnable, + ) { + self.0.dispatch_after(duration, runnable); } +} +pub fn init(dispatcher: Arc) { livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher)); } @@ -131,7 +136,7 @@ pub fn capture_local_audio_track( .expect("Failed to build input stream"); let stream_task = cx.foreground_executor().spawn(async move { - stream.play().expect("Failed to play stream"); + stream.play().log_err(); futures::future::pending().await }); @@ -155,10 +160,6 @@ pub fn capture_local_audio_track( )) } -pub struct AudioStream { - _tasks: [Task<()>; 2], -} - pub fn play_remote_audio_track( track: &track::RemoteAudioTrack, cx: &mut AppContext, diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index f50408ffb67476..8096e1fa74844b 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -11,7 +11,7 @@ use gpui::BackgroundExecutor; use live_kit_server::{proto, token}; use livekit::options::TrackPublishOptions; use parking_lot::Mutex; -use postage::{mpsc, sink::Sink, watch}; +use postage::{mpsc, sink::Sink}; use std::sync::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, Weak, @@ -238,7 +238,7 @@ impl TestServer { let mut server_rooms = self.rooms.lock(); for room in server_rooms.values_mut() { if let Some(room) = room.client_rooms.remove(&client_identity) { - *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected; + room.0.lock().connection_state = ConnectionState::Disconnected; } } } @@ -647,10 +647,7 @@ struct RoomState { url: String, token: String, local_identity: ParticipantIdentity, - connection: ( - watch::Sender, - watch::Receiver, - ), + connection_state: ConnectionState, paused_audio_tracks: HashSet, updates_tx: mpsc::Sender, } @@ -668,16 +665,12 @@ impl std::fmt::Debug for RoomState { } impl Room { - pub fn status(&self) -> watch::Receiver { - self.0.lock().connection.1.clone() - } - fn downgrade(&self) -> WeakRoom { WeakRoom(Arc::downgrade(&self.0)) } pub fn connection_state(&self) -> ConnectionState { - self.0.lock().connection.1.borrow().clone() + self.0.lock().connection_state } pub fn local_participant(&self) -> LocalParticipant { @@ -699,7 +692,7 @@ impl Room { local_identity: ParticipantIdentity(String::new()), url: url.to_string(), token: token.to_string(), - connection: watch::channel_with(ConnectionState::Disconnected), + connection_state: ConnectionState::Disconnected, paused_audio_tracks: Default::default(), updates_tx, }))); @@ -711,7 +704,7 @@ impl Room { { let mut state = this.0.lock(); state.local_identity = identity; - *state.connection.0.borrow_mut() = ConnectionState::Connected; + state.connection_state = ConnectionState::Connected; } Ok((this, updates_rx)) @@ -746,15 +739,16 @@ impl Room { } } -impl Drop for Room { +impl Drop for RoomState { fn drop(&mut self) { - let state = self.0.lock(); - if let Ok(server) = TestServer::get(&state.url) { - let executor = server.executor.clone(); - let token = state.token.clone(); - executor - .spawn(async move { server.leave_room(token).await.unwrap() }) - .detach(); + if self.connection_state == ConnectionState::Connected { + if let Ok(server) = TestServer::get(&self.url) { + let executor = server.executor.clone(); + let token = self.token.clone(); + executor + .spawn(async move { server.leave_room(token).await.unwrap() }) + .detach(); + } } } }