Skip to content

Commit

Permalink
add mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Sep 23, 2024
1 parent 8667110 commit 32cf737
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
11 changes: 10 additions & 1 deletion packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import type { AudioStreamInfo, NewAudioStreamResponse } from './proto/audio_frame_pb.js';
import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js';
import type { Track } from './track.js';
import { Mutex } from './utils.js';

export class AudioStream implements AsyncIterableIterator<AudioFrame> {
/** @internal */
Expand All @@ -17,6 +18,8 @@ export class AudioStream implements AsyncIterableIterator<AudioFrame> {
eventQueue: (AudioFrame | null)[] = [];
/** @internal */
queueResolve: ((value: IteratorResult<AudioFrame>) => void) | null = null;
/** @internal */
mutex = new Mutex();

track: Track;
sampleRate: number;
Expand Down Expand Up @@ -72,15 +75,21 @@ export class AudioStream implements AsyncIterableIterator<AudioFrame> {
};

async next(): Promise<IteratorResult<AudioFrame>> {
const unlock = await this.mutex.lock();
if (this.eventQueue.length > 0) {
unlock();
const value = this.eventQueue.shift();
if (value) {
return { done: false, value };
} else {
return { done: true, value: undefined };
}
}
return new Promise((resolve) => (this.queueResolve = resolve));
const promise = new Promise<IteratorResult<AudioFrame>>(
(resolve) => (this.queueResolve = resolve),
);
unlock();
return promise;
}

close() {
Expand Down
36 changes: 36 additions & 0 deletions packages/livekit-rtc/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
export class Mutex {
#locking: Promise<void>;
#locks: number;
#limit: number;

constructor(limit = 1) {
this.#locking = Promise.resolve();
this.#locks = 0;
this.#limit = limit;
}

isLocked(): boolean {
return this.#locks >= this.#limit;
}

async lock(): Promise<() => void> {
this.#locks += 1;

let unlockNext: () => void;

const willLock = new Promise<void>(
(resolve) =>
(unlockNext = () => {
this.#locks -= 1;
resolve();
}),
);

const willUnlock = this.#locking.then(() => unlockNext);
this.#locking = this.#locking.then(() => willLock);
return willUnlock;
}
}

0 comments on commit 32cf737

Please sign in to comment.