Skip to content

Commit

Permalink
Fix issues with test livekit client
Browse files Browse the repository at this point in the history
  • Loading branch information
maxbrunsfeld committed Jun 26, 2024
1 parent bc3bb5c commit a31856f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 42 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/live_kit_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ test-support = [

[dependencies]
anyhow.workspace = true
async-broadcast = "0.7"
async-trait = { workspace = true, optional = true }
cpal = { git = "https://github.com/zed-industries/cpal", rev = "fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50" }
collections = { workspace = true }
Expand All @@ -47,7 +46,6 @@ util.workspace = true
core-foundation.workspace = true

[target.'cfg(all(not(target_os = "macos")))'.dependencies]
async-trait = { workspace = true }
collections = { workspace = true }
gpui = { workspace = true }
live_kit_server.workspace = true
Expand Down
37 changes: 19 additions & 18 deletions crates/live_kit_client/src/live_kit_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use gpui::{AppContext, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStr
use media::core_video::{CVImageBuffer, CVImageBufferRef};
use parking_lot::Mutex;
use std::{borrow::Cow, sync::Arc};
use util::ResultExt as _;
use webrtc::{
audio_frame::AudioFrame,
audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
Expand All @@ -29,23 +30,27 @@ pub use test::*;

pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};

pub fn init(dispatcher: Arc<dyn gpui::PlatformDispatcher>) {
struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
pub struct AudioStream {
_tasks: [Task<()>; 2],
}

impl livekit::dispatcher::Dispatcher for Dispatcher {
fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
self.0.dispatch(runnable, None);
}
struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);

fn dispatch_after(
&self,
duration: std::time::Duration,
runnable: livekit::dispatcher::Runnable,
) {
self.0.dispatch_after(duration, runnable);
}
impl livekit::dispatcher::Dispatcher for Dispatcher {
fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
self.0.dispatch(runnable, None);
}

fn dispatch_after(
&self,
duration: std::time::Duration,
runnable: livekit::dispatcher::Runnable,
) {
self.0.dispatch_after(duration, runnable);
}
}

pub fn init(dispatcher: Arc<dyn gpui::PlatformDispatcher>) {
livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
}

Expand Down Expand Up @@ -131,7 +136,7 @@ pub fn capture_local_audio_track(
.expect("Failed to build input stream");

let stream_task = cx.foreground_executor().spawn(async move {
stream.play().expect("Failed to play stream");
stream.play().log_err();
futures::future::pending().await
});

Expand All @@ -155,10 +160,6 @@ pub fn capture_local_audio_track(
))
}

pub struct AudioStream {
_tasks: [Task<()>; 2],
}

pub fn play_remote_audio_track(
track: &track::RemoteAudioTrack,
cx: &mut AppContext,
Expand Down
36 changes: 15 additions & 21 deletions crates/live_kit_client/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use gpui::BackgroundExecutor;
use live_kit_server::{proto, token};
use livekit::options::TrackPublishOptions;
use parking_lot::Mutex;
use postage::{mpsc, sink::Sink, watch};
use postage::{mpsc, sink::Sink};
use std::sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc, Weak,
Expand Down Expand Up @@ -238,7 +238,7 @@ impl TestServer {
let mut server_rooms = self.rooms.lock();
for room in server_rooms.values_mut() {
if let Some(room) = room.client_rooms.remove(&client_identity) {
*room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
room.0.lock().connection_state = ConnectionState::Disconnected;
}
}
}
Expand Down Expand Up @@ -647,10 +647,7 @@ struct RoomState {
url: String,
token: String,
local_identity: ParticipantIdentity,
connection: (
watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>,
),
connection_state: ConnectionState,
paused_audio_tracks: HashSet<TrackSid>,
updates_tx: mpsc::Sender<RoomEvent>,
}
Expand All @@ -668,16 +665,12 @@ impl std::fmt::Debug for RoomState {
}

impl Room {
pub fn status(&self) -> watch::Receiver<ConnectionState> {
self.0.lock().connection.1.clone()
}

fn downgrade(&self) -> WeakRoom {
WeakRoom(Arc::downgrade(&self.0))
}

pub fn connection_state(&self) -> ConnectionState {
self.0.lock().connection.1.borrow().clone()
self.0.lock().connection_state
}

pub fn local_participant(&self) -> LocalParticipant {
Expand All @@ -699,7 +692,7 @@ impl Room {
local_identity: ParticipantIdentity(String::new()),
url: url.to_string(),
token: token.to_string(),
connection: watch::channel_with(ConnectionState::Disconnected),
connection_state: ConnectionState::Disconnected,
paused_audio_tracks: Default::default(),
updates_tx,
})));
Expand All @@ -711,7 +704,7 @@ impl Room {
{
let mut state = this.0.lock();
state.local_identity = identity;
*state.connection.0.borrow_mut() = ConnectionState::Connected;
state.connection_state = ConnectionState::Connected;
}

Ok((this, updates_rx))
Expand Down Expand Up @@ -746,15 +739,16 @@ impl Room {
}
}

impl Drop for Room {
impl Drop for RoomState {
fn drop(&mut self) {
let state = self.0.lock();
if let Ok(server) = TestServer::get(&state.url) {
let executor = server.executor.clone();
let token = state.token.clone();
executor
.spawn(async move { server.leave_room(token).await.unwrap() })
.detach();
if self.connection_state == ConnectionState::Connected {
if let Ok(server) = TestServer::get(&self.url) {
let executor = server.executor.clone();
let token = self.token.clone();
executor
.spawn(async move { server.leave_room(token).await.unwrap() })
.detach();
}
}
}
}
Expand Down

0 comments on commit a31856f

Please sign in to comment.