diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 40a66176..809220ad 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -7,6 +7,7 @@ import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; import type { AudioStreamInfo, NewAudioStreamResponse } from './proto/audio_frame_pb.js'; import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js'; import type { Track } from './track.js'; +import { Mutex } from './utils.js'; export class AudioStream implements AsyncIterableIterator { /** @internal */ @@ -17,6 +18,8 @@ export class AudioStream implements AsyncIterableIterator { eventQueue: (AudioFrame | null)[] = []; /** @internal */ queueResolve: ((value: IteratorResult) => void) | null = null; + /** @internal */ + mutex = new Mutex(); track: Track; sampleRate: number; @@ -72,7 +75,9 @@ export class AudioStream implements AsyncIterableIterator { }; async next(): Promise> { + const unlock = await this.mutex.lock(); if (this.eventQueue.length > 0) { + unlock(); const value = this.eventQueue.shift(); if (value) { return { done: false, value }; @@ -80,7 +85,11 @@ export class AudioStream implements AsyncIterableIterator { return { done: true, value: undefined }; } } - return new Promise((resolve) => (this.queueResolve = resolve)); + const promise = new Promise>( + (resolve) => (this.queueResolve = resolve), + ); + unlock(); + return promise; } close() { diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts new file mode 100644 index 00000000..7615e051 --- /dev/null +++ b/packages/livekit-rtc/src/utils.ts @@ -0,0 +1,36 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export class Mutex { + #locking: Promise; + #locks: number; + #limit: number; + + constructor(limit = 1) { + this.#locking = Promise.resolve(); + this.#locks = 0; + this.#limit = limit; + } + + isLocked(): boolean { + return this.#locks >= this.#limit; + } + + async lock(): Promise<() => void> { + this.#locks += 1; + + let unlockNext: () => void; + + const willLock = new Promise( + (resolve) => + (unlockNext = () => { + this.#locks -= 1; + resolve(); + }), + ); + + const willUnlock = this.#locking.then(() => unlockNext); + this.#locking = this.#locking.then(() => willLock); + return willUnlock; + } +}